CompletableFuture异步编程
一,基本用法
java8的CompletableFuture提供了函数式编程能力,使代码更加美观优雅,而且可以通过回调的方式计算处理结果,对异常处理也有了更好的处理手段.
CompletableFuture源码中有四个静态方法用来执行异步任务:
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier,Executor executor)
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable,Executor executor)
run开头的两个方法,用于执行没有返回值的任务,因为它的入参是Runnable对象,而supply开头的方法显然是执行有返回值的任务了,至于方法的入参,如果没有传入Executor对象将会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码.在实际使用中,一般我们使用自己创建的线程池对象来作为参数传入使用。
1,thenAccept()
功能:当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数,无返回值
场景:执行任务A,同时异步执行任务B,待任务B正常返回之后,用B的返回值执行任务C,任务C无返回值
public CompletionStage thenAccept(Consumer action);
public CompletionStage thenAcceptAsync(Consumer action);
public CompletionStage thenAcceptAsync(Consumer action,Executor executor);
2,thenRun()
功能:对不关心上一步的计算结果,执行下一个操作
场景:执行任务A,任务A执行完以后,执行任务B,任务B不接受任务A的返回值(不管A有没有返回值),也无返回值
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
3,thenApply()
功能:当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数,有返回值
场景:多个任务串联执行,下一个任务的执行依赖上一个任务的结果,每个任务都有输入和输出
public CompletableFuture thenApply(Function fn);
public CompletableFuture thenApplyAsync(Function fn);
public CompletableFuture thenApplyAsync(Function fn, Executor executor);
实例1:异步执行任务A,当任务A完成时使用A的返回结果resultA作为入参进行任务B的处理,可实现任意多个任务的串联执行
CompletableFuture futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture futureB = futureA.thenApply(s->s + " world");
CompletableFuture future3 = futureB.thenApply(String::toUpperCase);
System.out.println(future3.join());
4,thenCombine() thenAcceptBoth() runAfterBoth()
功能:结合两个CompletionStage的结果,进行转化后返回
场景:需要根据商品id查询商品的当前价格,分两步,查询商品的原始价格和折扣,这两个查询相互独立,当都查出来的时候用原始价格乘折扣,算出当前价格. 使用方法:thenCombine()
public CompletableFuture thenCombine(CompletionStage other, BiFunction fn)
public CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn)
public CompletableFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor)
实例1:thenCombine()是结合两个任务的返回值进行转化后再返回,那如果不需要返回,那就需要thenAcceptBoth(),同理,如果连两个任务的返回值也不关心,那就需要runAfterBoth
CompletableFuture futurePrice = CompletableFuture.supplyAsync(() -> 100d);
CompletableFuture futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
System.out.println("最终价格为:" + futureResult.join());
5,thenCompose()
功能:这个方法接收的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture
public CompletableFuture thenCompose(Function> fn);
public CompletableFuture thenComposeAsync(Function> fn);
public CompletableFuture thenComposeAsync(Function> fn, Executor executor);
这个方法和thenApply非常像,都是接受上一个任务的结果作为入参,执行自己的操作,然后返回.那具体有什么区别呢?
thenApply():它的功能相当于将CompletableFuture转换成CompletableFuture,改变的是同一个CompletableFuture中的泛型类型
thenCompose():用来连接两个CompletableFuture,返回值是一个新的CompletableFuture
6,applyToEither() acceptEither() runAfterEither()
功能:执行两个CompletionStage的结果,那个先执行完了,就是用哪个的返回值进行下一步操作
场景:假设查询商品a,有两种方式,A和B,但是A和B的执行速度不一样,我们希望哪个先返回就用那个的返回值
public CompletionStage applyToEither(CompletionStage other,Function fn);
public CompletionStage applyToEitherAsync(CompletionStage other,Function fn);
public CompletionStage applyToEitherAsync(CompletionStage other,Function fn,Executor executor);
7,exceptionally()
功能:当运行出现异常时,调用该方法可进行一些补偿操作,如设置默认值.
场景:异步执行任务A获取结果,如果任务A执行过程中抛出异常,则使用默认值100返回
public CompletionStage exceptionally(Function fn);
8,whenComplete()
功能:当CompletableFuture的计算结果完成,或者抛出异常的时候,都可以进入whenComplete方法执行
public CompletionStage whenComplete(BiConsumer action);
public CompletionStage whenCompleteAsync(BiConsumer action);
public CompletionStage whenCompleteAsync(BiConsumer action,Executor executor);
9,handle()
功能:当CompletableFuture的计算结果完成,或者抛出异常的时候,可以通过handle方法对结果进行处理
public CompletionStage handle(BiFunction fn);
public CompletionStage handleAsync(BiFunction fn);
public CompletionStage handleAsync(BiFunction fn,Executor executor);
10,allOf() anyOf()
功能:allOf当所有的CompletableFuture都执行完后执行计算
功能:anyOf最快的那个CompletableFuture执行完之后执行计算
public static CompletableFuture allOf(CompletableFuture... cfs)
public static CompletableFuture anyOf(CompletableFuture... cfs)
二,带Async的方法和不带此后缀的方法有什么不同
1,不带Async的方法是由触发该任务的线程执行该任务,带Async的方法是由触发该任务的线程将任务提交到线程池,执行任务的线程跟触发任务的线程不一定是同一个。
2,以thenApply和thenApplyAsync 两个方法进行对比,其他的和这个一样的,这两个方法区别就在于谁去执行这个任务,如果使用thenApplyAsync ,那么执行的线程是从ForkJoinPool.commonPool()中获取不同的线程进行执行,如果使用thenApply ,如果supplyAsync 方法执行速度特别快,那么thenApply 任务就是主线程进行执行,如果执行特别慢的话就是和supplyAsync执行线程一样。接下来我们通过例子来看一下,使用sleep方法来反应supplyAsync执行速度的快慢。
同步方法(即不带Async后缀的方法)有两种情况:
1、如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
2、如果注册时被依赖的操作还未执行完,则由回调线程执行。
异步方法(即带Async后缀的方法):
可以选择是否传递线程池参数 Executor运行在指定线程池中;当不传递Executor时,会使用ForkJoinPool中的共用线程池
supplyAsync还未完成,而thenAccept欲获取supplyAsync返回值时,thenAccept则会将此后续任务放入栈中,由supplyAsync将前置任务执行完毕后继续执行后续任务。此时前置任务与后续任务的执行都是在同一个工作线程中执行。
public static void main(String[] args) throws Exception {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "";
}, threadPool1);
//1、此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
CompletableFuture future2 = future1.thenApply(value -> {
System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value + "1";
});
String s = future2.get();
System.out.println("main函数 执行线程->[" + s + "]:" + Thread.currentThread().getName());
}
打印结果:
supplyAsync 执行线程:pool-1-thread-1
thenApply 执行线程:pool-1-thread-1
main函数 执行线程->[1]:main
supplyAsync已完成,thenAccept则获取supplyAsync返回值,将后续任务在当前线程执行完毕,因此前置任务在工作线程中执行,而后续任务在当前线程中执行。
public static void main(String[] args) throws Exception {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
return "";
}, threadPool1);
//耗时业务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//1、此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
CompletableFuture future2 = future1.thenApply(value -> {
System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return value + "1";
});
String s = future2.get();
System.out.println("main函数 执行线程->[" + s + "]:" + Thread.currentThread().getName());
}
打印结果:
supplyAsync 执行线程:pool-1-thread-1
thenApply 执行线程:main
main函数 执行线程->[1]:main