StepVerifier

StepVerifier can be used to perform unittesting on Reactor.

For example consider the following Flux

public class FluxVerifier {
	public Flux<String> fooBarFlux() {  
	  return Flux.just("foo", "bar");  
	}
}

We can write our unit test like this:

@Test  
public void givenFlux_expect_success() {  
  StepVerifier.create(fluxVerifier.fooBarFlux())
			  .expectNext("foo")
			  .expectNext("bar")
			  .verifyComplete();  
}

We can also provide our own Consumer to use our custom assertion function (AssertJ, JUnit,…)

@Test  
public void givenUserFlux_verify() {  
  StepVerifier.create(fluxVerifier.userFlux())  
      .assertNext(user -> assertThat(user.username).isEqualTo("swhite"))  
      .assertNext(user -> assertThat(user.username).isEqualTo("jpinkman"))  
      .verifyComplete();  
}

withVirtualTime

We can use withVirtualTime() to overwrite the Scheduler from Reactor. Which allows us to skip time.

For example consider the following case:

public Flux<Long> longFlux() {  
  return Flux.interval(Duration.ofMillis(1000)).take(10);  
}

Which takes time to finish the whole thing. In which we can use the following to skip through the time.

@Test  
public void givenLongFlux_withVirtualTime_verify() {  
  StepVerifier.withVirtualTime(() -> fluxVerifier.longFlux())  
      .expectSubscription()  
      .thenAwait(Duration.ofMillis(10 * 1000))  
      .expectNextCount(10)  
      .verifyComplete();  
}

The thenAwait() will stimulate a time bypass, which allows us to skip through 10000 ms and finish our Flux immediately. Therefore we can finish running the code.

Control the rate of consuming

We can use thenRequest(n) to consume the next n events. For example:

@Test  
void fourUserFlux_requestOne_expectCorrect() {  
  StepVerifier.create(requestApp.fourUserFlux())  
      .thenRequest(1)  
      .assertNext(user -> assertThat(user.username).isEqualTo("user1"))  
      .thenRequest(1)  
      .assertNext(user -> assertThat(user.username).isEqualTo("user2"))  
      .thenCancel().verify();  
}