Subscribeon Vs Publishon
subscribeOn
is the same as publishOn
, however with subscribeOn
when you call the code, everything will be affected. For example:
someFlux
.operation1()
.subscribeOn(Schedulers.boundedElastics()) // doesn't matter where we put this
// it still put to boundedElastics()
.operation2()
.operation3()
In this case, doesn't matter where we put the .subscribeOn()
, the operations (operation1
, operation2
and operation3
) will be handled by Schedulers.boundedElastics
However, if we use publishOn()
, only subsequent operations will affect:
someFlux.operation1()
.publishOn(Schedulers.boundedElastics())
.operation2()
.operation3()
In this case, only operation2
and operation3
are handled by Schedulers.boundedElastics
Example
Consider the following repository:
public class BlockingRepository<T> {
public void save(User user) {
try {
Thread.sleep(5000);
System.out.println("Saved: " + user.username + " " + Thread.currentThread());
} catch (Exception e) {
return;
}
}
}
Mono<Void> fluxToBlockingRepositoryParallel(Flux<User> flux, BlockingRepository<User> repository) {
return flux
.flatMap(user -> Mono.fromRunnable(
() -> repository.save(user)
).subscribeOn(Schedulers.boundedElastic()))
.then();
}
In here, we make sure that the repository.save
is an expensive task and it's executed on Schedulers.boundedElastic
. Combining with flatMap
this make sure that all the users are saved asynchronously.
As a result, this following test:
@Test
void fluxToBlockingRepositoryParallel_success() throws InterruptedException {
Flux<User> userFlux = Flux.just(
new User("austin"),
new User("david"),
new User("edward")
);
CountDownLatch countDownLatch = new CountDownLatch(1);
blockingToReactiveApp.fluxToBlockingRepositoryParallel(userFlux, blockingRepository)
.doFinally((r) -> countDownLatch.countDown()).subscribe();
countDownLatch.await();
}
Is done in 5 seconds even though we have 3 users.
Same scenario, consider the following code:
Mono<Void> fluxToBlockingRepository(Flux<User> flux, BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.boundedElastic())
.doOnNext(repository::save).then();
}
This means we publish the flux to be handled by Schedulers.boundedElastic
. However since doOnNext
is async and we call repository.save()
directly. We will still need to wait 5 seconds each which results in 15 seconds for 3 users.
@Test
void fluxToBlockingRepository_success() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<User> userFlux = Flux.just(
new User("austin"),
new User("david"),
new User("edward")
);
blockingToReactiveApp.fluxToBlockingRepository(userFlux, blockingRepository)
.doFinally((t) -> countDownLatch.countDown())
.subscribe(); // unblock already even without parallel, slow but still unblock
countDownLatch.await();
}
}
> Task :app:testClasses
Saved: austin Thread[boundedElastic-1,5,main]
Saved: david Thread[boundedElastic-1,5,main]
Saved: edward Thread[boundedElastic-1,5,main]
> Task :app:test
BUILD SUCCESSFUL in 15s