Zip
We can zip
different reactor stream together, into a tuple. For example:
static Flux<User> userFluxFromStringFlux(Flux<String> userName, Flux<String> firstName, Flux<String> lastName) {
return Flux.zip(userName, firstName, lastName)
.map((tuple) -> new User(tuple.getT1(), tuple.getT2(), tuple.getT3()));
}
After zipping, we should have a Tuple
which can access elements from different Flux
in the same time.
For example:
static void userFluxFromStringFluxMain() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
userFluxFromStringFlux(
Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(100)).take(3).map(i -> "userName" + i),
Flux.interval(Duration.ofSeconds(2), Duration.ofMillis(20)).take(3).map(i -> "firstName" + i),
Flux.interval(Duration.ofSeconds(1), Duration.ofMillis(200)).take(3).map(i -> "lastName" + i)
).doOnComplete(countDownLatch::countDown).subscribe(System.out::println);
countDownLatch.await();
}
> Task :app:OtherOperationApp.main()
username: userName0
firstname: firstName0
lastName: lastName0
username: userName1
firstname: firstName1
lastName: lastName1
username: userName2
firstname: firstName2
lastName: lastName2