函数式编程是种编程方式,它将计算机的运算视为函数的计算。函数编程语言最重要的基础是λ演算(lambdacalculus),而λ演算的函数可以接受函数当作输入(参数)和输出(返回值)。lambda表达式对与大多数程序员已经很熟悉了,jdk8以及es6都是引入的lambda。
在一个事件驱动的应用程序中,组件之间的交互是通过松耦合的生产者(production)和消费者(consumption)来实现的。这些事件是以异步和非阻塞的方式发送和接收的。
事件驱动的系统依靠推模式而不是拉模式或投票表决,即生产者是在有消息时才推送数据给消费者,而不是通过一种浪费资源方式:让消费者不断地轮询或等待数据。
程序发起执行以后,应该快速返回存储结果的上下文,把具体执行交给后台线程。待处理完成以后,异步地将真实返回值封装在此上下文中,而不是阻塞程序的执行。实时响应是通过异步编程实现的,例如:发起调用后,快速返回类似java8中CompletableFuture对象。
事件驱动的松散耦合提供了组件在失败下,可以抓获完全隔离的上下文场景,作为消息封装,发送到下游组件。在具体编程时可以检查错误,比如:是否接收到,接收的命令是否可执行等,并决定如何应对。
Reactor框架是Pivotal基于ReactiveProgramming思想实现的。它符合ReactiveStreams规范(ReactiveStreams是由Netflix、TypeSafe、Pivotal等公司发起的)的一项技术。其名字有反应堆之意,反映了其背后的强大的性能。
ReactiveProgramming,中文称反应式编程。ReactiveProgramming是一种非阻塞、事件驱动数据流的开发方案,使用函数式编程的概念来操作数据流,系统中某部分的数据变动后会自动更新其他部分,而且成本极低。
其最早是由微软提出并引入到.NET平台中,随后ES6也引入了类似的技术。在Java平台上,较早采用反应式编程技术的是Netflix公司开源的RxJava框架。Hystrix就是以RxJava为基础开发的。
反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对比,便可了解其基本思想:
上面表格的中的Observable那一列便代表反应式编程的API的使用方式。它其实是观察者模式的一种延伸。
如果将迭代器模式看作是拉模式,那观察者模式便是推模式。
如果Publisher发布消息太快了,超过了Subscriber的处理速度,那怎么办?这就是Backpressure的由来。ReactiveProgramming框架需要提供背压机制,使得Subscriber能够控制消费消息的速度。
ReactiveStreams由以下几个组件组成:
其主要的接口有这三个:
其中,Subcriber中便包含了上面表格提到的onNext、onError、onCompleted这三个方法。对于ReactiveStreams,只需要理解其思想就可以,包括基本思想以及Backpressure等思想即可。
Reactor框架主要有两个主要的模块:
在Reactor中,经常使用的类并不是很多,主要有以下两个:
Mono实现了org.reactivestreams.Publisher接口,代表0到1个元素的发布者。
Flux同样实现了org.reactivestreams.Publisher接口,代表0到N个元素的发表者。
代表背后驱动反应式流的调度器,通常由各种线程池实现。
Spring5引入的一个基于Netty而不是Servlet的高性能的Web框架-SpringWebFlux,但是使用方式并没有同传统的基于Servlet的SpringMVC有什么大的不同。
WebFlux中MVC接口的示例:
上面介绍了反应式编程的一些概念。可能读者看到这里有些乱,梳理一下三者的关系:
其实,对于业务开发人员来说,当编写反应式代码时,通常只会接触到Publisher这个接口,对应到Reactor便是Mono和Flux。
对于Subscriber和Subcription这两个接口,Reactor也有相应的实现。这些都是SpringWebFlux和SpringDataReactive这样的框架用到的。如果不开发中间件,开发人员是不会接触到的。
接下来介绍一下Reactor中Mono和Flux这两个类中的主要方法的使用。
如同Java8所引入的Stream一样,Reactor的使用方式基本上也是分三步:
只不过创建和消费可能是通过像Spring5这样框架完成的(比如通过WebFlux中的WebClient调用HTTP接口,返回值便是一个Mono)。但我们还是需要基本了解这些阶段的开发方式。
使用Reactor编程的开始必然是先创建出Mono或Flux。有些时候不需要我们自己创建,而是实现例如WebFlux中的WebClient或SpringDataReactive得到一个Mono或Flux。
上面是通过一个同步调用得到的结果创建出Mono或Flux,但有时需要从一个非Reactive的异步调用的结果创建出Mono或Flux。
如果这个异步方法返回一个CompletableFuture,那可以基于这个CompletableFuture创建一个Mono:
中间阶段的Mono和Flux的方法主要有filter、map、flatMap、then、zip、reduce等。这些方法使用方法和Stream中的方法类似。
下面举几个Reactor开发实际项目的问题,帮大家理解这些方法的使用场景:
本段内容将涉及到如下类和方法:
在Mono和Flux中间环节的处理过程中,有三个有些类似的方法:map()、flatMap()和then()。这三个方法的使用频率很高。
Objectresult1=doStep1(params);Objectresult2=doStep2(result1);Objectresult3=doStep3(result2);Mono.just(params).flatMap(v->doStep1(v)).flatMap(v->doStep2(v)).flatMap(v->doStep3(v));从上面两段代码的对比就可以看出来flatMap()方法在其中起到的作用,map()和then()方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以Mono为例):
then()看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。then()方法的参数只是一个Mono,无从接受上一步的执行结果。而flatMap()和map()的参数都是一个Function,入参是上一步的执行结果。
flatMap()和map()的区别在于,flatMap()中的入参Function的返回值要求是一个Mono对象,而map的入参Function只要求返回一个普通对象。在业务处理中常需要调用WebClient或ReactiveXxxRepository中的方法,这些方法的返回值都是Mono(或Flux)。所以要将这些调用串联为一个整体链式调用,就必须使用flatMap(),而不是map()。
并发执行是常见的一个需求。ReactiveProgramming虽然是一种异步编程方式,但是异步不代表就是并发并行的。
在传统的命令式编程中,并发执行是通过线程池加Future的方式实现的。
这时应该使用Mono和Flux中的zip()方法,以Mono为例,代码如下:
但上述代码存在一个问题,就是zip()方法需要做强制类型转换。而强制类型转换是不安全的。好在zip()方法存在多种重载形式。除了最基本的形式以外,还有多种类型安全的形式:
Mono.zip(item1Mono,item2Mono).map(tuple->{CustomType1item1=tuple.getT1();CustomType2item2=tuple.getT2();//DomergereturnmergeResult;});上述代码中,map()方法的参数是一个Tuple2,表示一个二元数组,相应的还有Tuple3、Tuple4等。
另外一个稍微复杂的场景,对一个对象中的一个类型为集合类的(List、Set)进行处理之后,再对原本的对象进行处理。使用迭代器模式的代码很容易编写:
可以看出,reduce()方法的功能是将一个Flux聚合成一个Mono。
接下来看一下示例:
直接消费的Mono或Flux的方式就是调用subscribe()方法。如果在WebFlux接口中开发,直接返回Mono或Flux即可。WebFlux框架会完成最后的Response输出工作。