JUC并发编程-CompletableFuture

Future和Callable接口

  • Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。(异步:可以被叫停,可以被取消)
  • 一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。

在这里插入图片描述

  • eg.比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。老师在上课,但是口渴,于是让班长这个线程去买水,自己可以继续上课,实现了异步任务。
  • 有个目的:异步多线程任务执行且有返回结果,三个特点:多线程/有返回/异步任务

FutureTask实现类

  • FutureTak(实现了x接口,x接口又继承了a和v接口)
    • 在源码可以看到,他既继承了RunnableFuture接口,也在构造方法中实现了Callable接口(有返回值、可抛出异常)和Runnable接口

(ctrl+alt+u)

在这里插入图片描述
在这里插入图片描述

  • 完成上面目的的代码 - 多线程/有返回/异步

    一个主线程,一个mythread|步执行了|返回了”hello callable”

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<String> futureTask = new FutureTask<>(new MyThread());
    Thread t1 = new Thread(futureTask,"t1");
    t1.start();
    System.out.println(futureTask.get());//接收返回值
    }
    }

    class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
    System.out.println("-----come in call() ----异步执行");
    return "hello Callable 返回值";
    }
    }
    //结果
    //-----come in call() ----异步执行
    //hello Callable 返回值

Future到CompletableFuture

Future优点

  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。
  • 方案一,3个任务1个main线程处理,大概1130ms

在这里插入图片描述

  • 方案二,3个任务3个线程,利用线程池(假如每次new一个Thread,太浪费资源,会有GC这些工作),大概400毫秒

在这里插入图片描述

Future缺点

1 get()阻塞

一旦调用get()方法,不管是否计算完成,都会导致阻塞(所以一般get方法放到最后)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<String>(()->{
System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
try {
TimeUnit.SECONDS.sleep(5);//暂停几秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});
Thread t1 = new Thread(futureTask,"t1");
t1.start();
//-----------------------------------------------------------注意顺序
System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
System.out.println(futureTask.get());
//----------------------------------------------------------注意顺序
}
}
//main -------主线程忙其他任务了
//t1 ------副线程come in

public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<String>(()->{
System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
try {
TimeUnit.SECONDS.sleep(5);//暂停几秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});
Thread t1 = new Thread(futureTask,"t1");
t1.start();
//-----------------------------------------------------------注意顺序
System.out.println(futureTask.get());
System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");

//----------------------------------------------------------注意顺序
}
}
//t1 ------副线程come in
//-------------------5秒后才出现下面的结果-------------说明一旦调用get()方法直接去找副线程了,阻塞了主线程
//task over
//main -------主线程忙其他任务了

2 isDone()轮询

利用if(futureTask.isDone())的方式使得他在结束之后才get(),但是也会消耗cpu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<String>(()->{
System.out.println(Thread.currentThread().getName()+"\t ------副线程come in");
try {
TimeUnit.SECONDS.sleep(5);//暂停几秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task over";
});
Thread t1 = new Thread(futureTask,"t1");
t1.start();

System.out.println(Thread.currentThread().getName()+"\t-------主线程忙其他任务了");
//1------- System.out.println(futureTask.get(3,TimeUnit.SECONDS));//只愿意等3秒,过了3秒直接抛出异常

//2-------更健壮的方式-------轮询方法---等副线程拿到才去get()
//但是也会消耗cpu资源
while(true){
if(futureTask.isDone()){
System.out.println(futureTask.get());
break;
}else{
//暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("正在处理中------------正在处理中");
}
}
}
}
//main -------主线程忙其他任务了
//t1 ------副线程come in
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//正在处理中------------正在处理中
//task over

Future应用现状

  • 对于简单的业务场景使用Future完全OK
    • 回调通知
      • 前面的isDone()方法耗费cpu资源,一般应该还是利用回调函数,在Future结束时自动调用该回调函数。应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
    • 创建异步任务
      • Future+线程池配合
    • 多个任务前后依赖可以组合处理(水煮鱼)
      • 想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值,将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时后面这个又依赖前一个处理的结果
      • 比如买鱼-加料-烹饪
    • 对计算速度选最快完成的(并返回结果)
      • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果。

CompletableFuture基本介绍

  • 阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无谓的CPU资源。因此,JDK8设计出CompletableFuture

CompletableFuture

1
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

在这里插入图片描述

  • 在Java 8中, Complet able Future提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能
    力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Complet able Future的方法。
  • 它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些
    动作。
  • 它实现了FutureCompletion Stage接口

CompletionStage

  • Completion Stage代表异步计算过程中的某一个阶段, 一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.then Apply(x->square(x) ) .then Accept
    (x->System.out.print(x) ) .then Run() ->System.out.print In() )
    ,一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。

核心的四个静态方法(分为两组)

  • 利用核心的四个静态方法创建一个异步操作 | 不建议用new
  • 关键就是 |有没有返回值|是否用了线程池|
  • 参数说明:
    • 没有指定Executor的方法,直接使用默认的ForkJoinPool.commPool()作为它的线程池执行异步代码。
    • 如果指定线程池,则使用我们定义的或者特别指定的线程池执行异步代码。

runAsync无返回值

1 runAsync

1
public static CompletableFuture<Void> runAsync(Runnable runnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
//停顿几秒线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(voidCompletableFuture.get());
}
}
//ForkJoinPool.commonPool-worker-9 //默认的线程池
//null --- 没有返回值

2 runAsync+线程池

1
2
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {

System.out.println(Thread.currentThread().getName());
//停顿几秒线程
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
},executorService);
System.out.println(voidCompletableFuture.get());
}
}
//pool-1-thread-1 ----指定的线程池
//null ----没有返回值

supplyAsync有返回值

3 supplyAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
1
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "helllo supplyasync";
});
System.out.println(objectCompletableFuture.get());
}
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了

4 supplyAsync+线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
12
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(3);//加入线程池

CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "helllo supplyasync";
},executorService);
System.out.println(objectCompletableFuture.get());
}
}
//ForkJoinPool.commonPool-worker-9---------默认的线程池
//helllo supplyasync-------------supplyasync有返回值了

CompletableFuture使用演示(日常使用)

基本功能

  • CompletableFuture可以完成Future的功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+"----副线程come in");
    int result = ThreadLocalRandom.current().nextInt(10);//产生一个随机数
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("1秒钟后出结果"+result);
    return result;
    });

    System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
    System.out.println(objectCompletableFuture.get());
    }
    }
    //main线程先去忙其他任务
    //ForkJoinPool.commonPool-worker-9----副线程come in
    //1秒钟后出结果6
    //6

减少阻塞和轮询whenComplete

  • CompletableFuture通过whenComplete减少阻塞和轮询(自动回调)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+"--------副线程come in");
    int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return result;
    }).whenComplete((v,e) -> {//没有异常,v是值,e是异常
    if(e == null){
    System.out.println("------------------计算完成,更新系统updataValue"+v);
    }
    }).exceptionally(e->{//有异常的情况
    e.printStackTrace();
    System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
    return null;
    });

    //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
    System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
    try {
    TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    //ForkJoinPool.commonPool-worker-9--------副线程come in(这里用的是默认的ForkJoinPool)
    //main线程先去忙其他任务
    //------------------计算完成,更新系统updataValue3
  • 假如换用自定义线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+"--------副线程come in");
    int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return result;
    },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常
    if(e == null){
    System.out.println("------------------计算完成,更新系统updataValue"+v);
    }
    }).exceptionally(e->{//有异常的情况
    e.printStackTrace();
    System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
    return null;
    });

    //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
    System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
    try {
    TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    //pool-1-thread-1--------副线程come in
    //main线程先去忙其他任务
    //------------------计算完成,更新系统updataValue6
  • 异常情况的展示,设置一个异常 int i = 10 / 0 ;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    public class CompletableFutureUseDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    CompletableFuture.supplyAsync(()->{
    System.out.println(Thread.currentThread().getName()+"--------副线程come in");
    int result = ThreadLocalRandom.current().nextInt(10);//产生随机数
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("-----结果---异常判断值---"+result);
    //---------------------异常情况的演示--------------------------------------
    if(result > 2){
    int i = 10 / 0 ;//我们主动的给一个异常情况
    }
    //------------------------------------------------------------------
    return result;
    },threadPool).whenComplete((v,e) -> {//没有异常,v是值,e是异常
    if(e == null){
    System.out.println("------------------计算完成,更新系统updataValue"+v);
    }
    }).exceptionally(e->{//有异常的情况
    e.printStackTrace();
    System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
    return null;
    });

    //线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
    System.out.println(Thread.currentThread().getName()+"线程先去忙其他任务");
    try {
    TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    //pool-1-thread-1--------副线程come in
    //main线程先去忙其他任务
    //-----结果---异常判断值---4 (这里 4 >2了,直接抛出异常)
    //异常情况java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero
    //java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    // at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    // at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    // at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    // at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    // at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    // at java.lang.Thread.run(Thread.java:748)
    //Caused by: java.lang.ArithmeticException: / by zero
    // at com.zhang.admin.controller.CompletableFutureUseDemo.lambda$main$0(CompletableFutureUseDemo.java:19)
    // at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    // ... 3 more

CompletableFuture优点总结

  • 异步任务结束时,会自动回调某个对象的方法;
  • 主线程设置好毁掉后,不再关心异步任务的执行,异步任务之间可以顺序执行
  • 异步任务出错时,会自动回调某个对象的方法。

函数式接口

  • 函数式接口的定义:

    • 任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口。对于函数式接口,我们可以通过lambda表达式来创建该接口的对象。

      1
      2
      3
      public interface Runnable{
      public abstract void run();
      }
  • 常见的函数式接口

    1. Runnable

      1
      2
      3
      4
      @FunctionalInterface
      public interface Runnable {
      public abstract void run();
      }
    2. Function

      1
      2
      3
      4
      @FunctionalInterface
      public interface Function<T, R> {
      R apply(T t);
      }
    3. Consumer

      1
      2
      3
      4
      @FunctionalInterface
      public interface Consumer<T> {
      void accept(T t);
      }
    4. Supplier

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      @FunctionalInterface
      public interface Supplier<T> {

      /**
      * Gets a result.
      *
      * @return a result
      */
      T get();
      }
    5. Biconsumer(Bi代表两个的意思,我们要传入两个参数,在上面的案例中是v和e)

      1
      2
      3
      4
      @FunctionalInterface
      public interface BiConsumer<T, U> {
      void accept(T t, U u);
      }
函数式接口名称 方法名称 参数 返回值
Runnable run 无参数 无返回值
Function apply 1个参数 有返回值
Consume accept 1个参数 无返回值
Supplier get 没有参数 有返回值
Biconsumer accept 2个参数 无返回值

链式调用|链式编程|链式写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Chain {
public static void main(String[] args) {
//-------------------老式写法------------
// Student student = new Student();
// student.setId(1);
// student.setMajor("cs");
// student.setName("小卡");
new Student().setId(1).setName("大卡").setMajor("cs");
}

}

@NoArgsConstructor
@AllArgsConstructor
@Data
@Accessors(chain = true)//开启链式编程
class Student{
private int id;
private String name;
private String major;
}

join和get对比

  • 功能几乎一样,区别在于编码时是否需要抛出异常
    • get()方法需要抛出异常
    • join()方法不需要抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Chain {
public static void main(String[] args) throws ExecutionException, InterruptedException {//抛出异常
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 12345";
});
System.out.println(completableFuture.get());
}

}

public class Chain {
public static void main(String[] args) {//抛出异常
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
return "hello 12345";
});
System.out.println(completableFuture.join());
}
}

CompletableFuture常用API

1.获得结果和触发计算

  • 获取结果
    • public T get() 不见不散,容易阻塞
    • public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常
    • public T join() 类似于get(),区别在于是否需要抛出异常
    • public T getNow(T valueIfAbsent)
      • 没有计算完成的情况下,给一个替代结果
      • 立即获取结果不阻塞
        • 计算完,返回计算完成后的结果
        • 没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
  • 主动触发计算
    • public boolean complete(T value) 是否立即打断get()方法返回括号值
      • (执行要2s,等待只有1s,所以还没执行完就被打断了。返回true表示打断了获取这个过程,直接返回了备胎值complete;如果没打断,返回false 和原来的abc)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CompletableFutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);//执行需要2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

try {
TimeUnit.SECONDS.sleep(1);//等待需要1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
// System.out.println(uCompletableFuture.getNow("xxx"));//执2-等1 返回xxx
System.out.println(uCompletableFuture.complete("completeValue")+"\t"+uCompletableFuture.get());//执2-等1 返回true+备胎值completeValue
}
}

2.对计算结果进行处理

  • thenApply 计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class CompletableFutureDemo2
    {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
    //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
    CompletableFuture.supplyAsync(() -> {
    //暂停几秒钟线程
    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    System.out.println("111");
    return 1024;
    }).thenApply(f -> {
    System.out.println("222");
    return f + 1;
    }).thenApply(f -> {
    //int age = 10/0; // 异常情况:那步出错就停在那步。
    System.out.println("333");
    return f + 1;
    }).whenCompleteAsync((v,e) -> {
    System.out.println("*****v: "+v);
    }).exceptionally(e -> {
    e.printStackTrace();
    return null;
    });

    System.out.println("-----主线程结束,END");

    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
    }
    //-----正常情况
    //111
    //222
    //333
    //----计算结果: 6

    //-----异常情况
    //111
    //异常.....
  • handle 类似于thenApply,但是有异常的话仍然可以往下走一步。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public class CompletableFutureDemo2
    {

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
    //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
    // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
    CompletableFuture.supplyAsync(() -> {
    //暂停几秒钟线程
    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    System.out.println("111");
    return 1024;
    }).handle((f,e) -> {
    int age = 10/0;//异常语句
    System.out.println("222");
    return f + 1;
    }).handle((f,e) -> {
    System.out.println("333");
    return f + 1;
    }).whenCompleteAsync((v,e) -> {
    System.out.println("*****v: "+v);
    }).exceptionally(e -> {
    e.printStackTrace();
    return null;
    });

    System.out.println("-----主线程结束,END");

    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
    }
    //-----异常情况
    //111
    //333
    //异常,可以看到多走了一步333
  • 一般用thenApply

3.对计算结果进行消费

  • 接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function

  • thenAccept

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
    CompletableFuture.supplyAsync(() -> {
    return 1;
    }).thenApply(f -> {
    return f + 2;
    }).thenApply(f -> {
    return f + 3;
    }).thenApply(f -> {
    return f + 4;
    }).thenAccept(r -> System.out.println(r));
    }
    //6
    //消费一下,直接得到6
  • 补充:Code之任务之间的顺序执行

    • thenRun
      • thenRun(Runnable runnable)
      • 任务A执行完执行B,并且B不需要A的结果
    • thenAccept
      • thenAccept(Consumer action)
      • 任务A执行完执行B,B需要A的结果,但是任务B无返回值
    • thenApply
      • thenApply(Function fn)
      • 任务A执行完执行B,B需要A的结果,同时任务B有返回值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
    //null

    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
    //resultA打印出来的 null因为没有返回值

    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
    //resultAresultB 返回值
    123456789

4.CompleteFuture和线程池说明(非常重要)

  • 上面的几个方法都有普通版本和后面加Async的版本

  • thenRunthenRunAsync为例,有什么区别?

  • 先看结论

    1. 没有传入自定义线程池,都用默认线程池ForkJoinPool
    2. 传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池
      • 调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
      • 调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
    3. 也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    //2-1
    public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
    return "abcd";
    },threadPool).thenRun(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
    }).thenRun(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
    }).thenRun(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
    });
    }
    }
    //1号任务 pool-1-thread-1
    //2号任务 pool-1-thread-1
    //3号任务 pool-1-thread-1
    //4号任务 pool-1-thread-1
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    //2-2
    public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
    return "abcd";
    },threadPool).thenRunAsync(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
    }).thenRun(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
    }).thenRun(()->{
    try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
    });
    }
    }
    //1号任务 pool-1-thread-1
    //2号任务 ForkJoinPool.commonPool-worker-9---这里另起炉灶重新调用了默认的ForkJoinPool
    //3号任务 ForkJoinPool.commonPool-worker-9
    //4号任务 ForkJoinPool.commonPool-worker-9
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class CompletableFutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(()->{
    // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
    return "abcd";
    },threadPool).thenRun(()->{
    // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
    }).thenRun(()->{
    // try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
    }).thenRun(()->{
    //try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}
    System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
    });
    }
    }
    //1号任务 1号任务 pool-1-thread-1
    //2号任务 main
    //3号任务 main
    //4号任务 main
  • 源码

    1
    2
    3
    4
    5
    6
    7
    8
    //CompletableFuture.java 2009行
    public CompletableFuture<Void> thenRun(Runnable action) {//传入值是一样的
    return uniRunStage(null, action);
    }

    public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);//但是这里有个异步的线程池asyncPool
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //进入asyncPool
    private static final boolean useCommonPool =
    (ForkJoinPool.getCommonPoolParallelism() > 1);//一般大于1都是成立的

    /**
    * Default executor -- ForkJoinPool.commonPool() unless it cannot
    * support parallelism.
    */
    private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();//所以这里会调用forkJoin线程池

5.对计算速度进行选用

  • applyToEither方法,快的那个掌权
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CompletableFutureDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException
{
CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return "play1 ";
});

CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
return "play2";
});

CompletableFuture<String> thenCombineResult = play1.applyToEither(play2, f -> {//对计算速度进行选用
return f + " is winner";
});

System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
}
}
//ForkJoinPool.commonPool-worker-9 ---come in
//ForkJoinPool.commonPool-worker-2 ---come in
//main play2 is winner

6.对计算结果进行合并

  • thenCombine 合并

    • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
    • 先完成的先等着,等待其它分支任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class CompletableFutureDemo2
    {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
    CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    return 10;
    });

    CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    return 20;
    });

    CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
    return x + y;
    });

    System.out.println(thenCombineResult.get());
    }
    }
    //30
    • 合并版本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class CompletableFutureDemo2
    {
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
    CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
    return 10;
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
    return 20;
    }), (x,y) -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
    return x + y;
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
    return 30;
    }),(a,b) -> {
    System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
    return a + b;
    });
    System.out.println("-----主线程结束,END");
    System.out.println(thenCombineResult.get());


    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    }
    }

JUC并发编程-CompletableFuture
https://yztldxdz.top/2022/11/20/JUC并发编程-CompletableFuture/
发布于
2022年11月20日
许可协议