What is Reactor?

Reactor is a library, by the Pivotal Open Source team, which is an implementation of the Reactive Streams Specification, bringing the paradigm of Reactive Programming on the JVM. It is a fully non-blocking reactive programming with efficient demand management (in the form of managing “backpressure”).

Reactive Streams Specification

Reactive Streams specification, which is the result of a collaborative effort of engineers from Pivotal, Netflix, Red Hat, Twitter and many other companies, determines the minimum set of interfaces required to build the asynchronous processing of a large volume of unbounded data. The main goal of this specification is to standardize the exchange of stream data across an asynchronous boundary of applications. The API consists four interfaces:

  1. Publisher: The publisher is responsible for generation an unbounded number of events (asynchronous) and pushing them to the associated subscribers.
  2. Subscriber: The subscriber consumes the events published by a publisher. The subscriber gets events from the publisher for subscription, new data, completion event, or error if in occur in the publisher. The subscriber choose which actions to take on any of these events.
  3. Subscription: The subscription is a shared context between the publisher and the subscriber. It is available with the subscriber only, and gives it the ability to control the flow of events coming the publisher. The subscription stops when it receives a completion or error events. The subscriber can manually cancel the subscription and it will stop receiving events from the publisher.
  4. Processor: It represents a stage of data processing between the publisher and the subscriber.

Beside the interfaces, in the specification, there are around 40 rules about the data exchange between the publisher and the subscriber. These rules are based on two principles: Asynchronous processing and Subscriber backpressure.
You can read more about the specification at http://www.reactive-streams.org/.

As I mentioned above, Reactor an implementation of this specification. The library helps us building reactive applications easily, handling requests, and handling backpressure.

Understanding the BOM

Reactor uses a BOM (Bill of Materials) model since reactor-core 3.0.4. The BOM is itself versioned, using a release train scheme with a codename followed by a qualifier. Here are a few examples:

Aluminium-RELEASE
Aluminium-SR1
Californium-BUILD-SNAPSHOT
Californium-SR32

The codenames (mostly come from the Periodic Table of Elements) represent what would traditionally be the MAJOR.MINOR number. The qualifiers are:

  • BUILD-SNAPSHOT
  • M1..N: Milestones or developer previews
  • RELEASE: The first GA (General Availability) release in a codename series
  • SR1..N: The subsequent GA releases in a codename series (equivalent to PATCH number, SR stands for “Service Release”)

The BOM concept is natively supported by Maven. First, you need to import the BOM by adding the following snippet to your dependencyManagement section in the pom.xml:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Californium-SR5</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Next, you need to add reactor dependencies to the relevant reactor projects you want to use (without providing the version), as:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

Here, we added the following two dependencies for example:

  • reactor-core – dependency on the core library
  • reactor-test – provides facilities to unit test reactive streams

Publishing Data

Reactor offers two asynchronous sequence APIs that implements the Reactive Streams Publisher interface: Flux for publishing [0..N] elements and Mono for publishing [0..1] elements.

Before talking about Flux and Mono, it is important know some concepts about streams. A stream emits events, it can emit:

  • value: every Publisher<T> emits values of type T
  • terminal operation: which ends the stream

Terminal Operation can be either:

  • completion: tells that the publisher published all of its values.
  • error: refers to an erroneous termination of the stream

All the events described above are optional, therefore, we can have four types of streams:

  • Empty stream: A publisher generating no values, it only generates a terminal event (completion or error)
  • Finite stream: A publisher generating N finite values, followed by a terminal event
  • Infinite stream: A publisher generating only value events, and no terminal events
  • Infinite empty stream: A stream generating no value events and no terminating events

The Flux<T> API

Flux is a Reactive Streams Publisher that emits an asynchronous sequence of 0 to N elements, and then completes (successfully or with an error). It can be also an infinite publisher, which means, it can emit elements all the time with no completion event. As in the Reactive Streams spec, these 3 types of signal (emit, successfully completion, and error) are translated to calls to a downstream Subscriber’s onNext, onComplete or onError methods.

Flux.just()

Create a Flux that takes a set of values, and generates a finite Flux stream with them (emits the provided elements and then completes).

Flux.just("Red");
Flux.just("White", "Black", "Yellow", "Red");
Flux.just(new Product("Apple iPhone XS", 999.99), new Product("Apple iPhone XS Max", 1299.99));

We can also create Flux from different types, as from Arrays, Streams and Iterables:

  • Flux.fromArray(): This is used to build a stream from an array of a type T.
  • Flux.fromIterable(): This is used to build a stream from collections.
  • Flux.fromStream(): This is used to build a Flux from an existing Java 8+ stream or a Java 8+ stream supplier.
Flux.fromArray(new Integer[]{1, 1, 2, 3, 5, 8});
Flux.fromIterable(Arrays.asList("White", "Black", "Yellow", "Red"));
Flux.fromStream(Stream.of("a1", "a2", "a3"));

Flux has a large set of other methods that can be used as:

  • Flux.from: This method takes an existing reactive publisher and generates a Flux from it.
  • Flux.count(): Counts the number of values in this Flux.
  • Flux.interval(Duration period): Create a Flux that emits long values starting with 0 and incrementing at specified time intervals on the global timer.
  • Flux.empty(): This method generates an empty stream with no values and only completion.
  • Flux.error(Throwable error): Create a Flux that terminates with the specified error immediately after being subscribed to.
  • Flux.never(): Create a Flux that will never signal any data, error or completion signal.
  • Flux.defer: This method is used to build a lazy reactive publisher every time a Subscription is made on the resulting Flux, so the actual source instantiation is deferred until each subscribes and the Supplier can create a subscriber-specific instance.
  • Flux.delayElements(Duration delay): Delay each of this Flux elements (Subscriber.onNext(T) signals) by a given Duration.
  • Flux.log(): Observe all Reactive Streams signals and trace them using Logger support.

You can check the Reactor Flux API page for more methods

The MONO<T> API

Mono is a Reactive Streams Publisher that emits at most one item and then optionally terminates with an onComplete signal or an onError signal. It offers a subset of the operators that are available for a Flux.

Mono.just()

Create a new Mono that emits the specified item, and then terminates with a complete signal.

There are other forms of the Mono.Just() as:

  • Mono. justOrEmpty(T data): Create a new Mono that emits the specified item if non-null otherwise only emits onComplete.
  • Mono.justOrEmpty(Optional data): Create a new Mono that emits the specified item if Optional.isPresent() otherwise only emits onComplete.
Mono.just("Red");
Mono.justOrEmpty(value);
Mono.justOrEmpty(Optional.empty());

You can also create Monos from other types as Future, Callable, etc…:

  • Mono.from: Expose the specified Publisher with the Mono API, and ensure it will emit 0 or 1 item.
  • Mono.fromCallable: This method generates Mono with one value, followed by the completion event. If multi-valued datasets, like arrays or collections, are returned from Callable, then the complete dataset is pushed as an object in the single event.
  • Mono.fromFuture: This method generates Mono with one value, followed by the completion event.
  • Mono.fromSupplier: This method generates Mono with one value, followed by the completion event.
  • Mono.fromRunnable: This method generates Mono with no value and only a completion event.

Also Mono has a large set of methods, the same Flux, as:

  • Mono.flux(): Convert this Mono to a Flux.
  • Mono.hasElement(): Emit a single boolean true if this Mono has an element.
  • Mono.empty(): Generates a stream with no value and only a completion.
  • Mono.error(): Generates a stream with no value and only a specified error.
  • Mono.never(): Generates a stream with no events at all. It does not generate an event of any type.
  • Mono.log(): Observe all Reactive Streams signals and trace them using Logger support.

Check the Mono API page for more methods.

subscribING to publishers

Publishers (Flux or Mono) won’t do anything unless you will subscribe to them. Once you subscribe to a publisher it will start emitting its values to the subscriber. The publisher will emit its events to the subscriber which are: value, completion, error and subscription. The subscriber will decide what he wants to do with each one of these events.

Both Flux and Mono have many variants of subscribe as following:

1. subscribe();

2. subscribe(Consumer<? super T> consumer);

3. subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);

4. subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);

5. subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

The subscribe method about do the following respectively:

  1. Subscribe and trigger the sequence but no event is consumed.
  2. Do something with each produced value while ignoring the error, complete and subscription events.
  3. Consume the values but also react to the error event
  4. Deal with values and errors but also execute some code when the sequence successfully completes.
  5. Deal with values and errors and successful completion but also do something with the Subscription produced by this subscribe call.

Here are some examples for the subscribe methods:

Subscribe with only handling consumed values:

Flux.just("White", "Black", "Yellow", "Red").subscribe(System.out::println);
__________________
Output:
White
Black
Yellow
Red

Subscribe and dealing with consumed value and an error:

Flux.range(1, 4).map(i -> {
      if (i <= 3) {
        return i;
      }
      throw new RuntimeException("ERROR: got 4");
    }).subscribe(
        System.out::println, 
        error -> System.out.println("Exception thrown: " + error));
__________________
Output:
1
2
3
Exception thrown: java.lang.RuntimeException: ERROR: got 4

Subscribe and dealing with consumed value, error and completion event:

Flux.range(1, 4).subscribe(
        i -> System.out.println("consuming: " + i),
        error -> System.out.println("error: " + error),
        () -> System.out.println("Stream completed")
    );
__________________
Output:
consuming: 1
consuming: 2
consuming: 3
consuming: 4
Stream completed

Subscribe and deal with consumed value, error, completion event and deal with the subscription:

Flux.range(1, 4).subscribe(
        i -> System.out.println("consuming: " + i),
        error -> System.out.println("error: " + error),
        () -> System.out.println("Stream completed"),
        subscription -> subscription.request(2)
    );
__________________
consuming: 1
consuming: 2

You can see in the last example above, in the subscription we asked the publisher to send us only two events, with are 1 and 2 and because the publisher did not reach the completion stage it didn’t emit this event therefore no Stream completed message was written to the output.

Reactor operators

As mentioned above, Reactor also provide us a large set of operators that can be used to compose readable data pipelines. There are various operators for filtering, mapping, and collecting data. Here are some examples:

  • filter(Predicate predicate): Evaluate each source value against the given Predicate.
  • take(long n): Take only the first N values from this Flux, if available.
  • take(Duration timespan): Relay values from this Flux until the specified Duration elapses.
  • take(Duration timespan, Scheduler timer): Relay values from this Flux until the specified Duration elapses, as measured on the specified Scheduler.
  • takeLast(int n): Emit the last N values this Flux emitted before its completion.
  • takeUntil(Predicate predicate): Relay values from this Flux until the given Predicate matches.
  • skip(count): This will reject the specified number of elements from the beginning of the stream.
  • skip(Duration): This will reject elements for the said duration from the beginning of the stream.
  • skipLast(count): This will reject a specified number of elements from the end of the stream.
  • skipUntil(Predicate predicate): This will reject elements until the first occurrence of the said condition is true.
  • distinct(): This operator is used to select unique elements of the passed data stream.
  • distinctUntilChanged(): This operator is used to select the first set of distinct items.
  • ignoreElements(): Ignores onNext signals (dropping them) and only propagate termination events.
  • single(): This operator is used to select only a single data element.
  • elementAt(int index): This operator selects the element at the specified index of the stream.
Flux.range(1, 10)
        .filter(i -> i % 2 == 0)
        .map(i -> "number: " + i)
        .takeLast(2)
        .subscribe(System.out::println);
  }
__________________
number: 8
number: 10

More operators can be found in the Flux and Mono wikis.

Testing reactor publishers

Good developers don’t write code without testing it, right? So, to test reactive publishers, we use StepVerifier. StepVerifier provides a declarative way of creating a verifiable script for an async Publisher sequence. The verification must be triggered after the terminal expectations (completion, error, cancellation) have been declared, by calling one of the verify() methods.

Below is a simple test for the last Flux we wrote above:

Flux<String> flux = Flux.range(1, 10).
        filter(i -> i % 2 == 0)
        .map(i -> "number: " + i)
        .takeLast(2);

StepVerifier.create(flux)
        .expectNext("number: 8", "number: 10")
        .verifyComplete();

In the StepVerifier above, we first create a StepVerifier from a Flux publisher and then we write our expectations, what values the input Flux suppose to emit and in the end we verify its completion.

If the Flux supposed to throw an exception, then we test it with verifyError:

Flux<Integer> flux = Flux.range(1, 4)
        .map(i -> {
          if (i <= 3) {
            return i;
          }
          throw new RuntimeException("ERROR: got 4");
    });

StepVerifier.create(flux)
        .expectNext(1, 2, 3)
        .verifyError(RuntimeException.class);

Verifying empty Flux can be done with only verifying completion without any expected emitted values as:

Flux<Integer> flux = Flux.empty();

StepVerifier.create(flux)
        .verifyComplete();

Testing Monos is the same way as testing Flux:

Mono<String> mono = Mono.just("value");

StepVerifier.create(mono)
        .expectNext("value")
        .verifyComplete();

Related Posts

Leave a comment