Flux

Represents 0 to N elements

Pasted image 20220912195202.png

If there is error, the flux will not stream anymore data

Instance methods

toIterable()

Convert the current flux to Iterable<T>. For example:

public Iterable<User> fluxToValues(Flux<User> userFlux) {  
  return userFlux.toIterable();  
}
@Test  
void fluxToIterable_success() {  
  var userIterable = blockingApp.fluxToValues(  
      Flux.just(  
          new User("Austin"),  
          new User("Partir")  
      )  
  );  
  
  userIterable.forEach(System.out::println);  
}
> Task :app:testClasses
username: Austin
firstname: null
lastName: null

username: Partir
firstname: null
lastName: null

ignoreElements

When call flux.ignoreElements() it will return a Mono<T> if you want to keep the type.

The Mono<T> simply doesn't contain any data, it will just be used to signal that the flux has been completed.

static Mono<User> fluxIgnoreElements(Flux<User> userFlux) {  
  // Should return an empty mono of type Mono<User>  
  return userFlux.ignoreElements();  
}
static void fluxIgnoreElementsMain() {  
  var userFlux = Flux.just(  
    new User("Austin"),  
    new User("Tet"),  
    new User("ook")  
  );  
  
  fluxIgnoreElements(userFlux).log().subscribe(System.out::println); // should not print  
}
> Task :app:OtherOperationApp.main()
[ INFO] (main) onSubscribe(MonoIgnoreElements.IgnoreElementsSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()

then

Similar to ignoreElements() but this will return a Mono<Void>. Can be used if you don't need a Mono<T>

static Mono<Void> fluxIgnoreGetStatusOnly(Flux<User> userFlux) {  
  return userFlux.then();  
}
static void fluxIgnoreGetStatusOnly() {  
  var userFlux = Flux.just(  
      new User("Austin"),  
      new User("Tet"),  
      new User("ook")  
  );  
  
  fluxIgnoreGetStatusOnly(userFlux).log().subscribe(System.out::println); // should not print  
}
> Task :app:OtherOperationApp.main()
[ INFO] (main) onSubscribe(MonoIgnoreElements.IgnoreElementsSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onComplete()

collect

Convert to a Mono<List<T>> or Mono<? extends Collectors> type

static Mono<List<User>> fluxCollection(Flux<User> flux) {  
  return flux.collect(Collectors.toList());  
  // or flux.collectList()  
}
static void fluxCollectionMain() {  
  var userFlux = Flux.just(  
      new User("Austin"),  
      new User("Tet"),  
      new User("ook")  
  );  
  fluxCollection(userFlux).subscribe(System.out::println);  
}
> Task :app:OtherOperationApp.main()
[username: Austin
firstname: null
lastName: null
, username: Tet
firstname: null
lastName: null
, username: ook
firstname: null
lastName: null
]

Static method

firstWithValue

Same as Mono > firstWithValue but for Flux