响应式编程之浅见


在计算中,响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

以上是维基百科中对于响应式编程的定义,这段文字很简短同时也很抽象,乍一看或许并不是那么好理解。其实响应式编程对于我们并不是一个陌生的概念,响应式编程的设计理念充斥了软件系统的各个角落。例如我们在设计模式中常提到的观察者模式(Observer Pattern),将多个Observer注册到Observable上,对于Observable的数据变化做出实时反映,这是面向对象设计模式中响应式思想的体现。Java8中的Stream流,CompletableFuture,函数式风格API,一直到Java9引入的Flow API,让我们看到JDK的设计者们对于Java未来的想象,我们可以发现Java语言的发展逐渐趋向于追随函数式和响应式的编程范式。同时RxJava,Project Reactor等第三方库的支持,为构建响应式风格的系统提供了良好的基础建设。尤其是Spring家族作为Java生态的引领者,在Spring Framework 5.0版本开始全面使用Reactive API实现一些功能组件,似乎Spring官方对于响应式风格的推广有强烈信心,那么响应式编程到底能给我们带来什么呢?

阻塞意味着浪费


现代应用程序需要应对大量的并发用户,虽然硬件的性能水平在快速发展,但也往往无法跟上并发量的增长,因此我们需要在软件层面通过更巧妙的设计以有限的硬件资源来支撑较大的并发量。从广义上来说有两种方式来提升系统的性能:

  • 横向扩容,使用更多的服务器等硬件资源,这意味着花费更高的成本。
  • 更有效地使用现有资源,这意味着对软件设计有更高的要求,而响应式编程的目的正在于此。

我们知道利用Java并发编程可以通过调度更多线程充分地压榨CPU,以达到更高的硬件资源利用率,尤其是在I/O密集型的程序中带来的提升是明显的,但同时带来的是更高的代码复杂度以及资源并发访问所导致的各种线程安全问题。

异步编程


通过编写异步非阻塞式的代码,我们可以在当前线程进入I/O等待时,将资源让出执行其他任务,异步处理执行完成后再返回到当前逻辑。在Java中主要有两种异步编程模型:

Callback 回调

package priv.just.framework.webflux.test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;

public class CallbackDemo {

    public static void main(String[] args) throws InterruptedException {
        invokeAsync(System.out::println);
        System.out.println("first");
        new CountDownLatch(1).await();
    }

    private static void invokeAsync(Consumer<String> consumer) {
        ForkJoinPool.commonPool().submit(() -> {
            // ... do something
            Thread.sleep(1000);
            String result = "second";
            consumer.accept(result);
            return result;
        });
    }

}

Future 占位符

package priv.just.framework.webflux.test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureDemo {

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Future<String> future = executorService.submit(() -> {
            // ... do something
            Thread.sleep(1000);
            return "second";
        });
        System.out.println("first");
        System.out.println(future.get());
    }

}

这两种方式都有其局限性,在一种复杂的业务场景下,多个异步任务很难组合到一起,任务的编排难度大幅增加,而且当多个回调逻辑嵌套在一起时,会产生所谓的 Callback Hell(回调地狱),代码的可读性和可维护性都大幅降低。而响应式库可以帮助我们优雅地进行异步编程。

CompletableFuture 任务编排

我们可以假想这样一个业务场景:

  1. 调用 getUserIds() 获取一组用户ID。
  2. 针对每个用户ID,调用 getUserRole() 获取其角色名称。
  3. 针对每个用户ID,调用 getUserCompany() 获取其公司名称。
  4. 将所有结果组合为 userId-userRole-userCompany 返回。
private List<String> invokeSync() {
    List<String> res = new ArrayList<>();
    List<Long> userIds = getUserIds();
    for (Long userId : userIds) {
        String userRole = getUserRole(userId);
        String userCompany = getUserCompany(userId);
        res.add(userId + "-" + userRole + "-" + userCompany);
    }
    return res;
}

在不采用任何并发及异步编程的情况下显然这段代码的效率是低下的,因为获取多个用户的信息之前存在并行的可能,而获取用户角色和获取用户公司两者之间没有相互依赖的关系,也可以并行化。

@SuppressWarnings("unchecked")
private List<String> invokeAsync() {
    CompletableFuture<List<String>> res = CompletableFuture.supplyAsync(this::getUserIds)
            .thenComposeAsync(userIds -> {
                CompletableFuture<String>[] tasks = userIds.stream().map(userId -> {
                    CompletableFuture<String> getUserRoleTask = CompletableFuture.supplyAsync(() -> getUserRole(userId));
                    CompletableFuture<String> getUserCompanyTask = CompletableFuture.supplyAsync(() -> getUserCompany(userId));
                    return getUserRoleTask.thenCombineAsync(getUserCompanyTask, (userRole, userCompany) -> userId + "-" + userRole + "-" + userCompany);
                }).toArray(CompletableFuture[]::new);
                return CompletableFuture.allOf(tasks).thenApply(v -> Arrays.stream(tasks).map(CompletableFuture::join).collect(Collectors.toList()));
            });
    return res.join();
}

采用 CompletableFuture 对任务进行编排后更充分地利用了多核CPU的性能,代码的执行效率有所提升,但我们发现代码变成了层层嵌套的结构,类似于回调地狱的状况,代码的可读性明显降低,而且在整个程序中普遍实行这样的编程方式代码复杂度是不可想象的。

响应式库将并发问题抽离出来

private ParallelFlux<String> invokeReactive() {
     return getUserIds()
             .parallel(16)
             .runOn(Schedulers.newParallel("myParallel", 16))
             .flatMap(userId -> {
                 Mono<String> userRole = getUserRole(userId);
                 Mono<String> userCompany = getUserCompany(userId);
                 return userRole.zipWith(userCompany,  (role, company) -> userId + "-" + role + "-" + company);
             });
}

在以上的代码中通过调用 parallel() 方法将 Flux 响应式流转化为 ParallelFlux 并行流,使之后的 flatMap 操作并行化,并通过 runOn() 方法指定在某个线程池上运行。在这个过程中我们并没有去考虑诸如线程的创建,任务的编排等问题,因为响应式库帮我们抽象了并发逻辑,很大程度上使我们从复杂的并发编程中解脱出来。

在订阅前什么都不会发生


在响应式编程中数据的消费者是行为的发起者,在消费者对数据进行订阅之前,所有的计算操作都不会发生。

Flux.range(1, 100)
        .map(i -> i * 2)
        .filter(i -> i % 2 == 0)
        .take(10)
        .publishOn(Schedulers.elastic())
        // Nothing Happens Until You subscribe()
        .subscribe(System.out::println);

在数据流被 subscribe() 之前,一切的代码只是在创建异步流程的抽象描述,可以想象成在声明一条数据的流水线,每一个操作符都代表流水线的某个操作环节,而此时是没有数据在这条流水线上流动的,也就是CPU并没有进行计算行为。而当 subscribe() 方法被调用时,数据的订阅者开始向上游发出数据请求,触发整个数据流的计算。

Backpressure 背压


背压本身是对一种现象的描述,它表示的是数据生产者的生产速率大于数据消费者的消费速率,因此造成数据在缓冲区堆积的现象。试想在一条制造业的生产流水线上,如果上游加工环节的工人熟练度大于下游,那么可能造成产品在某一环节产生挤压,而最终产品的生产效率是由下游决定的,即使上游的生产效率再高也没有意义。从软件层面上来说,背压现象往往意味着数据缓冲区被占满,此时如果不对缓冲区容量进行限制,就意味着OOM,而限制缓冲区容量就意味着需要丢弃部分数据。理想情况是数据的生产消费速率是近乎匹配的,此时意味着最大程度地节省内存。而响应式编程中经常提到的背压机制实际指的是对于背压现象的调价,也就是由数据消费者引导生产者的生产速率,使生产者的速率与自身消费速率相匹配。

推拉模型

pull 拉模型的体现,迭代器模式

public static void main(String[] args) {
    Publisher publisher = new Publisher(new int[] {1, 2, 3});
    Publisher.PublisherIterator iterator = publisher.iterator();
    while (iterator.hasNext()) {
        System.out.println(iterator.next());
    }
}

private static class Publisher {

    private final int[] arr;

    public Publisher(int[] arr) {
        this.arr = arr;
    }

    public PublisherIterator iterator() {
        return new PublisherIterator();
    }

    private class PublisherIterator implements Iterator<Integer> {

        private int index;

        @Override
        public boolean hasNext() {
            return index < arr.length;
        }

        @Override
        public Integer next() {
            int res = arr[index];
            index++;
            return res;
        }

    }

}

push 推模型的体现,观察者模式

public static void main(String[] args) throws InterruptedException {
    Publisher publisher = new Publisher(new int[] {1, 2, 3});
    publisher.addObserver((o, data) -> {
        System.out.println(data);
    });
    publisher.notifyObservers();
    new CountDownLatch(1).await();
}

private static class Publisher extends Observable {

    private final int[] arr;

    public Publisher(int[] arr) {
        this.arr = arr;
    }

    @Override
    public void notifyObservers() {
        for (int number : arr) {
            setChanged();
            super.notifyObservers(number);
        }
    }

}

在拉模型中我们往往会采用诸如分页查询的策略对数据进行分批查询,所以一般不会产生背压现象,但数据的生产和消费是交替进行的,两者无法并行最大程度地利用多核CPU的计算性能。在推模型中数据是由生产者驱动的,只要生产速率快于消费速率,数据在内存中积累必然产生背压现象,此时会降低内存利用率,在不加以控制的情况下产生OOM。

拉与推的结合,响应式编程

以 Project Reactor 为例,响应式流为我们提供四种背压策略:

  • ERROR: 当下游跟不上节奏的时候发出一个错误信号。
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
  • LATEST:让下游只得到上游最新的元素。
  • BUFFER:缓存下游没有来得及处理的元素(如果缓存不限大小的可能导致OutOfMemoryError)。
package priv.just.framework.webflux.test;

import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class OnBackpressureErrorDemo {

    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));

    public static void main(String[] args) throws InterruptedException {
        Flux.interval(Duration.ofMillis(1))
                .map(i -> ByteBuffer.allocate(1024 * 1024))
                .onBackpressureBuffer(Integer.MAX_VALUE)
                //.onBackpressureError()
                //.onBackpressureDrop()
                //.onBackpressureLatest()
                .subscribe(new BaseSubscriber<ByteBuffer>() {
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) {
                        request(1);
                    }

                    @Override
                    protected void hookOnNext(ByteBuffer buffer) {
                        executor.execute(() -> {
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("next: " + buffer);
                            request(1);
                        });
                    }

                    @Override
                    protected void hookOnComplete() {
                        System.out.println("complete");
                    }

                    @Override
                    protected void hookOnError(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });

        new CountDownLatch(1).await();
    }

}

线程模型的演化


Java BIO 模型

java-bio

在 Java 1.4 之前的 BIO 模型中,客户端每建立一个 socket 连接,服务端都需要创建一个线程并分配给这个连接,直到这个连接被关闭,而在服务端线程是宝贵的资源,如果长时间被占用就是对资源的浪费,所以原生的 Java BIO 只适用于并发量要求很低的系统。

Tomcat BIO 模型

tomcat-bio

tomcat 中的 BIO 线程模型是通过一个 Accept 线程建立与客户端的 TCP 连接,随后将连接交给线程池处理,相比于原生的 Java BIO 模型,线程池本身对于线程资源有整体的管控,意味着对服务端存在一定的保护,但由于依赖的依然是 Java BIO 的 API,线程池资源被占尽后的请求只能进入等待或被拒绝,整体的吞吐量并没有提升。

Tomcat NIO 模型

tomcat-nio

  • Acceptor 线程负责建立连接,同时把连接加入 Poller 队列。
  • 每个 Poller 队列中有一个队列,同时有一个 Selector 轮询这个队列,一旦某个连接被绑定到了某个 Poller 上,绑定关系永远不会改变,Selector 一旦发现该连接上发生读写事件,则将读写事件交给业务线程池处理。
  • 接收新请求与请求的业务处理分离,意味着新请求不会被业务处理阻塞。同时只有在连接上发生读写事件时该请求才会占用业务线程池,显然这种模式对于线程的利用率是更高的。

Netty Reactor 模型

netty-reactor


文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论
 上一篇
锁之浅见 锁之浅见
在计算机科学中,锁是在执行多线程时用于强行限制资源访问的同步机制,即用于在并发控制中保证对互斥要求的满足。一般的锁是建议锁(advisory lock),每个线程在访问对应资源前都需获取锁的信息,再根据信息决定是否可以访问。若访问对应信息,
2020-08-17 Ethan Zhang
下一篇 
Nacos 如何适应 Spring Cloud 规范 Nacos 如何适应 Spring Cloud 规范
我们知道在 Java 微服务生态体系的发展过程中,有两种主流的解决方案逐渐受到行业内的关注,一种是基于 Spring 技术体系,由 Spring 官方团队推出的 Spring Cloud 微服务一体式解决方案可以为我们方便地搭建一个微服务
2020-06-20 Ethan Zhang