有时候,合适的异步编排可以大大提高数据获取的效率,本文介绍在项目中如何使用 CompletableFuture 实现异步编排,进而加速订单数据的获取。

CompletableFuture 实践

一、简介

在 JDK1.5 中引入了 Future 接口来描述异步运算的结果。可以通过方法 isDone 方法来检查计算是否完成,或者使用 get 阻塞调用线程,直到计算完成返回结果,或者通过 cancel 方法停止任务的执行。

虽然 Future 接口提供了异步执行任务的能力,但是对于结果的获取却不是很方便,只能通过阻塞或轮询方式得到任务的结果。阻塞方式显然和异步编程的初衷背离,轮询的方式有会浪费无谓的CPU资源,而且也不能及时的计算结果。

很多语言, 比如 Node.js, 采用回调的方式实现异步编程。 Java 的一些框架, 比如 Netty, 自己扩展了 Java 的 Future接口, 提供了addListener等多个扩展方法; Google guava 也提供了通用的扩展 Future; Scala 也提供了简单易用且功能强大的 Future/Promise 异步编程模式。 但Future并不可以将其他运行组合或则进行异常处理的方法。

同样地,在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture, 提供了非常强大的Future 的扩展功能, 可以帮助我们简化异步编程的复杂性, 提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 并且提供了转换和组合 CompletableFuture 的方法。CompletableFuture 类实现了 Future 接口, 所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果, 但是这种方式不推荐使用 。

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类, 都可以获取线程的执行结果。

image-20210622184727953

有了对 CompletableFuture 的基本了解,下面来简单介绍 Completeable 的一些 API 。

创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

image-20210622194843007

  • runXxxx 都是没有返回结果的, supplyXxx 都是可以获取返回结果的 ;
  • 可以传入自定义的线程池, 否则就用默认的线程池;

计算完成回调方法

image-20210622194800177

  • whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况;
  • whenCompleteAsync 把任务交给线程池异步执行,whenComplete 是执行当前任务的线程继续执行;

handle 方法

image-20210622194630102

  • 和 complete 类似,可以对结果做最后处理(可处理异常),可改变返回值。

线程串行化方法

image-20210622194544713

  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值;
  • thenAccept 方法:接收任务的处理结果,并消费处理,无返回结果;
  • thenRun 方法:只要上面任务完成,就开始执行 thenRun,只是处理完任务后,执行 thenRun 的后继操作。

两任务组合-都完成

image-20210622194451406

两个任务都必须完成,才触发该任务。

  • thenCombine: 组合两个 future, 获取两个 future 的返回结果,并返回当前任务的返回值

  • thenAcceptBoth: 组合两个 future, 获取两个 future 任务的返回结果, 然后处理任务,没有返回值。

  • runAfterBoth: 组合两个 future, 不需要获取 future 的结果, 只需两个 future 处理完任务后,处理该任务。

    两任务完成-完成其一

image-20210622194935711

当两任务中,任意一个Future任务完成是,执行任务。

  • applyToEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务并有新的返回值。

  • acceptEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务, 没有新的返回值。

  • runAfterEither: 两个任务有一个执行完成, 不需要获取 future 的结果, 处理任务, 也没有返回值。

多任务组合

image-20210622195045142

  • allOf:等待所有任务完成
  • anyOf:只要一个任务完成

二、CompletableFuture 在项目的运用

在查询商品详情页时,需要调用多个模块的接口获取多种数据(如会员模块的用户数据、库存模块的库存数据、优惠模块的优惠卷数据等等)。

由于这些数据分布在不同的服务中,这些数据的时候需要通过远程调用来获取,如常见的OpenFeign。

由于部分数据不存在依赖性限制,即一些数据查询是可以相对独立的,此时可以考虑使用异步编排来同时获取数据,优化查询效率。

如在项目platform-order#toTrade查询创建订单时,需要调用三个服务:

  • 调用【用户服务】查询购物车地址;(假设100ms完成)
  • 调用【用户服务】查询用户优惠信息;(假设101ms完成)
  • 调用【购物车服务】查询购物车选中项;(假设102ms完成)
  • 调用【库存服务】查询商品库存;(假设103ms完成)
Precursor_figure

如果采用串行执行,则需要花费:100ms+101ms+102ms+103ms=405ms(串行执行,依次查询)

如果采用异步编排,则需要花费:102ms+103ms=205ms(只需要考虑最长路径)

从分析可以得到,如果不考虑线程上下文切换等因素,异步编排将能够大大提高商品详情页的数据。

简化代码如下:CompletableFuture + ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public OrderConfirmVo toTrade() throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = new OrderConfirmVo();
MemberRespVo memberRespVo = UserContextHolder.getCurrentUser();

CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {
// 1、查询所有的收货地址【用户服务】
List<MemberAddressVo> address = memberFeignService.addressList(memberRespVo.getId());
confirmVo.setAddressList(address);
}, poolExecutor);

CompletableFuture<Void> getItems = CompletableFuture.supplyAsync(() -> {
// 2、查询购物车选中的商品【购物车服务】
List<OrderItemVo> items = cartFeignService.listOwnCheckItem();
confirmVo.setItemVoList(items);
return items;
}, poolExecutor).thenAcceptAsync((items) -> {
// 3、查询远程是否存在库存【库存服务】
List<SkuStockSearchVo> stockStatus = wareFeignService.batchSkuStock(skuIds);
if (stockStatus != null) {
Map<Long, Boolean> map = stockStatus.stream().collect(Collectors.toMap(SkuStockSearchVo::getSkuId, SkuStockSearchVo::getStock));
confirmVo.setHasStockMap(map);
}
}, poolExecutor);

// 4、查询用户优惠信息【用户服务】(简化处理)
Integer integration = memberRespVo.getIntegration();
confirmVo.setIntegration(integration);

// 5、生成防重令牌,分别保存在Redis与返回页面,这样就可以在页面提交的时候直接校验(保重接口幂等性)
String orderToken = UUID.randomUUID().toString().replace("-", "");
confirmVo.setOrderToken(orderToken);
redisTemplate.opsForValue().set(PlatformOrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId(), orderToken);

CompletableFuture.allOf(getAddress, getItems).get();

return confirmVo;
}

三、用户信息丢失

在使用异步编排的时候,需要特别注意一个问题,就是异步化是否会对原有的逻辑造成影响

在本项目中,由于采用的是 Cookie 进行身份识别,如果直接采用上述代码而不是使用添加额外措施,将会导致在远程调用的时候身份信息丢失,进而导致查询远程接口失败。下面我们就来观察直接异步化存在的问题。

首先,将异步化的程序运行一遍,查看是否能够正常获取数据(串行时是正常的)

调用【购物车服务】时报错,查看报错信息发现用户信息尚未认证。

在购物车服务,在拦截器中会根据 Cookie 信息获取验证用户,如果授权成功则会将信息存放到ThreadLocal中

分析:为什么在串行编程时通过openfeign调用第三方就可以?而异步编排就会丢失信息呢?

默认情况下,使用OpenFeign的时候会丢失header信息的,其中丢失的信息就包含了Cookie信息。

为了解决OpenFeign构造请求丢失header信息问题,考虑配置一个请求拦截器RequestInterceptor,统一拦截请求来完成设置header等相关请求。

下面代码中,配置了一个RequestInterceptor,当OpenFeign都会经过该拦截器,而该拦截器会为每个请求添加上原请求的header信息(其中包含了用于身份认证的Cookie信息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class FeignConfig {
@Bean
public RequestInterceptor requestInterceptor() {
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate template) {
// 1、获取到进入该请求的上下文保持器
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (requestAttributes != null) {
// 2、从中获取Request对象
HttpServletRequest request = requestAttributes.getRequest();
// 3、从请求头中获取Cookie
String cookies = request.getHeader("Cookie");
// 4、将Cookie放到Feign的远程调用的请求中
template.header("Cookie", cookies);
}
}
};
}
}

那么问题来了,原请求的信息是如何获取到的呢?那就是通过RequestContextHolder,通过RequestContextHolder可以获取到当前请求的信息,其中就包含了当前请求中的header信息。

而RequestContextHolder内部是通过“thread-bound”方式来共享请求信息的。当一个请求来临,Web容器会为其分配一个用于该请求的线程。由于每个请求都会有一个与之对应的线程进行处理,于是人们就考虑将请求的信息保存在ThreadLocal中,实现请求信息与线程绑定。这样就可以实现,在当前请求的任意位置,只要属于当前线程,就可以获取到请求信息,这也是RequestContextHolder的基本原理。

现在回到一开始的疑问,为啥在串行执行时OpenFeign过载的请求就能成功获取到数据,而异步编排就不行呢?

通过上面的分析,原因就很明了了,在串行执行中OpenFeign构造请求时始终是在一个请求中,通过该请求可获取到当前的信息;但是在异步编排时,是在线程池获取的线程中构造请求,自然就没办法拿到数据,自然就没办法获取到请求头数据了。

那么是不是使用了OpenFeign就没办法使用异步编排了呢?显然不是,我们只需要在异步编排内部重新设置当前的请求属性即可了,详情看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public OrderConfirmVo toTrade() throws ExecutionException, InterruptedException {
OrderConfirmVo confirmVo = new OrderConfirmVo();
MemberRespVo memberRespVo = UserContextHolder.getCurrentUser();
// 在异步编排外部获取当前请求的信息。【主处理线程】
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {
// 在异步编排内部重新设置主处理线程的请求信息,使OpenFeign在构造请求时可以顺利获取到相应的信息。
RequestContextHolder.setRequestAttributes(requestAttributes);
// ...
}, poolExecutor);
// ...
}

自此,在OpenFeign下使用CompletableFutrue的问题总算是解决完毕。

如果需要查看更加详细的代码,通过去访问Github中Platform仓库进行查看。

四、参考文献

评论