05-流式操作:使用 Flux 和 Mono 构建响应式数据流
1 通过 Flux 对象创建响应式流
相对而言,静态方法在使用上都比较简单,但不如动态方法来得灵活。我们来一起看一下。
2 通过静态方法创建 Flux
中静态创建 Flux 的方法常见的包括 just()、range()、() 以及各种以 from- 为前缀的方法组等。因为 Flux 可以代表 0 个数据,所以也有一些专门用于创建空序列的工具方法。
2.1 just() 方法
我已经在上一讲为你演示过 just() 方法,它可以指定序列中包含的全部元素,创建出来的 Flux 序列在发布这些元素之后会自动结束。一般情况下,在已知元素数量和内容时,使用 just() 方法是创建 Flux 的最简单直接的做法。
示例:
Flux.just("Hello", "World").subscribe(System.out::println);
Hello
World
这里我们对 Flux 执行了用于订阅的 () 方法,并通过使用 表达式调用了 .out.() 方法,这意味着将结果打印到系统控制台。关于 () 方法以及对响应式流的订阅过程,我会在本讲后续内容中进一步说明。
() 方法组
如果我们已经有了一个数组、一个 对象或 对象,那么就可以通过 Flux 提供的 () 方法组来从这些对象中自动创建 Flux,包括 ()、() 和 () 方法。
示例:
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
执行结果
1
2
3
range() 方法
如果你快速生成一个整数数据流,那么可以采用 range() 方法,该方法允许我们指定目标整数数据流的起始元素以及所包含的个数,序列中的所有对象类型都是 ,这在创建连续的年份信息或序号信息等场景下非常有用。使用 range() 方法创建 Flux 对象的示例代码如下所示。
Flux.range(2020, 5).subscribe(System.out::println);
显然,这段代码会在控制台中打印出 5 行记录,从 2020 开始,到 2024 结束。
() 方法
在 框架中,() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列。通过 () 所具备的一组重载方法,我们可以分别指定这个数据序列中第一个元素发布之前的延迟时间,以及每个元素之间的时间间隔。() 方法相对复杂,我们先附上它的弹珠图,如下所示。
使用 () 方法创建 Flux 示意图(来自 官网)
可以看到,上图中每个元素发布时相当于添加了一个定时器的效果。使用 () 方法的示例代码如下所示。
Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(200)).subscribe(System.out::println);
这段代码的执行效果相当于在等待 2 秒钟之后,生成一个从 0 开始逐一递增的无界数据序列,每 200 毫秒推送一次数据。
empty()、error() 和 never()
根据上一讲介绍的 异步序列的语义,我们可以分别使用 empty()、error() 和 never() 这三个方法类创建一些特殊的数据序列。其中,如果你希望创建一个只包含结束消息的空序列,那么可以使用 empty() 方法,使用示例如下所示。显然,这时候控制台应该没有任何的输出结果。
Flux.empty().subscribe(System.out::println);
然后,通过 error() 方法可以创建一个只包含错误消息的序列。如果你不希望所创建的序列不发出任何类似的消息通知,也可以使用 never() 方法实现这一目标。当然,这几个方法都比较少用,通常只用于调试和测试。
小结
不难看出,静态创建 Flux 的方法简单直接,一般用于生成那些事先已经定义好的数据序列。
而如果:
就需要用到动态创建方法。
3 通过动态方法创建 Flux
动态创建 Flux 所采用的就是以编程的方式创建数据序列,最常用的就是 () 方法和 () 方法。
() 方法
() 方法生成 Flux 序列依赖于 所提供的 组件,定义如下。
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
组件包括 next()、() 和 error() 这三个核心方法。从 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。
next() 方法只能最多被调用一次。使用 () 方法创建 Flux 的示例代码如下。
Flux.generate(sink -> {sink.next("javaedge");sink.complete();
}).subscribe(System.out::println);
运行该段代码,会在系统控制台上得到“”。我们在这里调用了一次 next() 方法,并通过 () 方法结束了这个数据流。如果不调用 () 方法,那么就会生成一个所有元素均为“”的无界数据流。
这个示例非常简单,但已经具备了动态创建一个 Flux 序列的能力。如果想要在序列生成过程中引入状态,那么可以使用如下所示的 () 方法重载。
Flux.generate(() -> 1, (i, sink) -> {sink.next(i);if (i == 5) {sink.complete();}return ++i;
}).subscribe(System.out::println);
引入一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。
()
() 方法与 () 方法比较类似,但它使用的是一个 组件,定义如下。
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
除了 next()、() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 () 方法创建 Flux 的示例代码如下。
Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next("javaedge" + i);}sink.complete();
}).subscribe(System.out::println);
运行该程序,我们会在系统控制台上得到从“”到“”的 5 个数据。通过 () 方法创建 Flux 对象的方式非常灵活,在本专栏中会有多种场景用到这个方法。
以上就是通过Flux 对象创建响应式流的方法,此外,还可以通过 Mono 对象来创建响应式流,我们一起来看一下。
4 通过 Mono 对象创建响应式流
可认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 () 等方法。
() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。
Mono.justOrEmpty(Optional.of("javaedge")).subscribe(System.out::println);
另一方面,如果要想动态创建 Mono,我们同样也可以通过 () 方法并使用 组件,示例代码如下。
Mono.create(sink ->
sink.success("javaedge")).subscribe(System.out::println);