Subscribeon Vs Publishon

subscribeOn is the same as publishOn, however with subscribeOn when you call the code, everything will be affected. For example:

someFlux
	.operation1()
	.subscribeOn(Schedulers.boundedElastics()) // doesn't matter where we put this
											   // it still put to boundedElastics()
	.operation2()
	.operation3()

In this case, doesn't matter where we put the .subscribeOn(), the operations (operation1, operation2 and operation3) will be handled by Schedulers.boundedElastics

However, if we use publishOn(), only subsequent operations will affect:

someFlux.operation1()
		.publishOn(Schedulers.boundedElastics())
		.operation2()
		.operation3()

In this case, only operation2 and operation3 are handled by Schedulers.boundedElastics

Example

Consider the following repository:

public class BlockingRepository<T> {  
  public void save(User user) {  
    try {  
      Thread.sleep(5000);  
      System.out.println("Saved: " + user.username + " " + Thread.currentThread());  
    } catch (Exception e) {  
      return;  
    }  
  }  
}
Mono<Void> fluxToBlockingRepositoryParallel(Flux<User> flux, BlockingRepository<User> repository) {  
  return flux  
	.flatMap(user -> Mono.fromRunnable(  
		() -> repository.save(user)  
	).subscribeOn(Schedulers.boundedElastic()))  
	.then();
}

In here, we make sure that the repository.save is an expensive task and it's executed on Schedulers.boundedElastic. Combining with flatMap this make sure that all the users are saved asynchronously.

As a result, this following test:

@Test  
void fluxToBlockingRepositoryParallel_success() throws InterruptedException {  
  Flux<User> userFlux = Flux.just(  
      new User("austin"),  
      new User("david"),  
      new User("edward")  
  );  
  
  CountDownLatch countDownLatch = new CountDownLatch(1);  
  
  blockingToReactiveApp.fluxToBlockingRepositoryParallel(userFlux, blockingRepository)  
      .doFinally((r) -> countDownLatch.countDown()).subscribe();  
  
  countDownLatch.await();  
}

Is done in 5 seconds even though we have 3 users.

Same scenario, consider the following code:

Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {  
  return flux  
      .publishOn(Schedulers.boundedElastic())  
      .doOnNext(repository::save).then();  
}

This means we publish the flux to be handled by Schedulers.boundedElastic. However since doOnNext is async and we call repository.save() directly. We will still need to wait 5 seconds each which results in 15 seconds for 3 users.

  @Test  
  void fluxToBlockingRepository_success() throws InterruptedException {  
    CountDownLatch countDownLatch = new CountDownLatch(1);  
  
    Flux<User> userFlux = Flux.just(  
        new User("austin"),  
        new User("david"),  
        new User("edward")  
    );  
  
    blockingToReactiveApp.fluxToBlockingRepository(userFlux, blockingRepository)  
        .doFinally((t) -> countDownLatch.countDown())  
        .subscribe(); // unblock already even without parallel, slow but still unblock  
  
    countDownLatch.await();  
  }  
}
> Task :app:testClasses
Saved: austin Thread[boundedElastic-1,5,main]
Saved: david Thread[boundedElastic-1,5,main]
Saved: edward Thread[boundedElastic-1,5,main]
> Task :app:test
BUILD SUCCESSFUL in 15s