Blocking To Flux

We can convert a blocking operation into Flux. For example consider the following code:

public class BlockingRepository<T> {  
  public Iterable<User> findAll() {  
    try {  
      Thread.sleep(5000);  
      return List.of(  
          new User("Austin"),  
          new User("Don"),  
          new User("David"),  
          new User("Peter")  
      );  
    } catch (Exception e) {  
      return Collections.emptyList();  
    }  
  }
}

When calling blockingRepository.findAll() we will have to wait 5 seconds to the operation to complete.

To make this non-blocking, we can use subscribeOn() to defer the work to a Scheduler

Flux<User> blockingRepositoryToFlux(BlockingRepository<User> blockingRepository) {  
  return Flux  
      .defer(() -> Flux.fromIterable(blockingRepository.findAll()))  
      .subscribeOn(Schedulers.boundedElastic());  
}

In here, defer is just a lazy of from(). So whenever there is a findAll() it will do nothing until there is a subscriber subscribe on it.

As a result, the code has been now unblocking:

@Test  
void blockingRepositoryToFlux_success() throws InterruptedException {  
  CountDownLatch countDownLatch = new CountDownLatch(1);  
  
  blockingToReactiveApp.blockingRepositoryToFlux(blockingRepository)  
      .doOnComplete(countDownLatch::countDown)  
      .subscribe(System.out::println);  
      
  countDownLatch.await();  
}

In here we just use CountDownLatch to wait for it to be done.