Concat Vs Merge
Concat
will not have interleave, which means it will wait until the first source finish before moving on to the second.
Merge
will have interleave as it will just dispatch whichever one finish first without waiting
Concat
We can use either of these syntaxes
public static Flux<User> concatWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.concatWith(flux2); // wait for flux 1 finish before using flux2
}
public static Flux<User> concatNoInterleave(Flux<User> flux1, Flux<User> flux2) {
return Flux.concat(flux1, flux2); // wait for flux 1 finish before using flux2
}
For example:
public static void mergeFluxMain(BiFunction<Flux<User>, Flux<User>, Flux<User>> callback) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<User> flux1 = Flux.just(
new User("austin"),
new User("test"),
new User("david"),
new User("don")
).delayElements(Duration.ofMillis(1000));
Flux<User> flux2 = Flux.just(
new User("partir"),
new User("peter")
);
callback.apply(flux1, flux2)
.doOnComplete(countDownLatch::countDown)
.subscribe(user -> System.out.println(user.username));
countDownLatch.await();
}
public static void main(String[] args) throws InterruptedException {
// mergeFluxMain(MergeApp::concatWithNoInterleave);
// Same as
mergeFluxMain(MergeApp::concatNoInterleave);
}
This will result into
austin
test
david
don
partir
peter
Because it will wait for flux1
to finish first before emitting to flux2
.
Merge
Merge can be use with in both these syntaxes:
public static Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
return flux1.mergeWith(flux2); // Does not wait for flux1, first come first serve
}
public static Flux<User> mergeInterleave(Flux<User> flux1, Flux<User> flux2) {
return Flux.merge(flux1, flux2); // Does not wait for flux1, first come first serve
}
Given the same mergeFluxMain
function above, we can call using the following:
public static void main(String[] args) throws InterruptedException {
// mergeFluxMain(MergeApp::mergeFluxWithInterleave);
// Same as
mergeFluxMain(MergeApp::mergeInterleave);
}
Which will have
partir
peter
austin
test
david
don
In this case, it will execute whichever stream finishes first, therefore results into interleaving between 2 streams.