Blocking To Flux
We can convert a blocking operation into Flux
. For example consider the following code:
public class BlockingRepository<T> {
public Iterable<User> findAll() {
try {
Thread.sleep(5000);
return List.of(
new User("Austin"),
new User("Don"),
new User("David"),
new User("Peter")
);
} catch (Exception e) {
return Collections.emptyList();
}
}
}
When calling blockingRepository.findAll()
we will have to wait 5 seconds to the operation to complete.
To make this non-blocking, we can use subscribeOn()
to defer the work to a Scheduler
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> blockingRepository) {
return Flux
.defer(() -> Flux.fromIterable(blockingRepository.findAll()))
.subscribeOn(Schedulers.boundedElastic());
}
In here, defer
is just a lazy of from()
. So whenever there is a findAll()
it will do nothing until there is a subscriber subscribe on it.
As a result, the code has been now unblocking:
@Test
void blockingRepositoryToFlux_success() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
blockingToReactiveApp.blockingRepositoryToFlux(blockingRepository)
.doOnComplete(countDownLatch::countDown)
.subscribe(System.out::println);
countDownLatch.await();
}
In here we just use CountDownLatch to wait for it to be done.