DEV Community

Cover image for Making reactive applications with a Kitten Care Example
João Esperancinha
João Esperancinha

Posted on

Making reactive applications with a Kitten Care Example

Original Photo

At the beginning of 2020, I was trying to make an example to illustrate the benefits of using a Reactive Programming model.
I already had an idea of an application that I wanted to make and because I love cats, I wanted to make an application to register cats found in the wild.
This way cats could be helped by being redirected to a Cat care house in order to find an available carer.
The idea of making the application reactive was to also be able to make a presentation about it. Unfortunately the presentation never took place and I couldn't continue with it.
However, I was able to finish just in time the project I wanted to make.
In this article I will share with you my experience in creating this application.
I find this to be very helpful if you are looking forward to grasping the intricacies of Reactive Programming.

We will together go through how Reactive programming can make our programming models easier and faster.
What we should know is that Reactive programming relies heavily on the observer pattern.
In short, an observer pattern is one where we have two major players:

  1. A publisher - An entity that manages received events
  2. A subscriber - An entity that subscribes to the publisher and does something with the data generated by the publisher.

The source code for this project can be found here:

alt text Kitten House Care

Reactive programming in current times is viewed as one of the best programming paradigms ever.
However, this concept isn't a new concept.
It already exists since the 1960's under a different name: Data Flows.


1. Introduction


1960's

In order to illustrate the way the concept of Reactive programming started, we have to go back to the 1960's in a time capsule.

At that time Jack Dennis and his graduate team at MIT developed concepts surrounding data and how data could be efficiently transmitted.
The concept invented was the concept of Data flows.
Dataflow programming concepts pioneered in this decade because of Jack Dennis inventive character.

The years where this kind of programming gained momentum was in the years 1974 – 1975.
Along these years, the data stream concept and its interactions were further developed.

The data flow ancestors are:

  1. Asynchronous Digital Logic
  2. Control Structures for Parallel Programming
  3. Abstract Models for Concurrent Systems
  4. Theory of Program Schemes
  5. Structured Programming
  6. Functional Programming

What data flows wanted to show is that data can be emitted as a response to a system.

"For every action, there is an equal and opposite reaction."

Newton's third law

What this means is that the idea was also to find a way to work better with a decentralizad system.

Using the traditional sequential, procedural or control paradigms, represented a problem.
This problem was the blocking nature of such paradigms.
The best way solve this was to parallelize, what could be parallelized.
Therefore, the dataflow paradigm finally was settled to be one where serveral black boxes were connected together.
Connections are made by the use of several inputs.
When all the inputs of one black box are satisfied, then the black box runs.

In 1961, a paper, called " A Block Diagram Compiler ". was published with the development of a new programming language.
BLODI, the new language, short for BLOck DIagram, was the first programming language following the data flow paradigms.
Its application was in electronic circuits.
It was designed by John Larry Kelly Jr. (1923-1965), Carol Lochbaum and Victor A. Vyssotsky (1931-2012)

A presentation from Jack Dennis has been written for the MIT and is available directly on:

alt text
Dennis Talk


2010

In 2010, Eric Meijer, a Computer Scientist working for Microsoft, coined the term Reactive Programming.
Up until this date Eric Meijer was involved inthe development of different .Net technologies.
These included C#, Visual Basic, LINQ and Volta.
The first framework to ever appear which was referred to as Reactive was the Reactive programming framework in 2009.
It was then called a "Watershed Advance in Asynch Programming".
The reason is very simple.
One of the most important paradigms of Reactive programming is its non-blocking nature.
Non-blocking also means that we try to perform separate small tasks independently and asynchronously.
In 2010, the creation of Reactive Extensions for .NET revolutionized the way we look into software development.
Reactive programming, an old concept, was brought to life.
Currently Reactive Extensions for .NET is actually called Reactive X, and it is available in most common programming languages.
Reactive Programming is commonly referred to as Dataflow programming on steroids.
The reason being is that it's just an extension of the Dataflow programming paradigm.

In November 17th of 2009, Erik Meijer was interviewed on Channel 9 about Rx. The video isn't available online anymore.
This interview used to be available online on their website: Erik-Meijer-Rx-in-15-Minutes


Reactive Manifesto

The Reactive Manifesto was created in 2013 by Jonas Bonér.
This manifesto applies to systems and is many times confused with the idea of reactive programmming.
Both reactive programming and reactive systems want decoupling to happen, they want to run as soon as certain conditions are met and they want to resource to parallelism as much as possible.
In other words these two concepts have in common that they want maximum availability and asynchronicity whenever possible.
This is probably where the similarities end.

A reactive system must obey the following rules or at least explore these as maximum as possible:

  1. Responsive: It needs to respond quickly.
    1. The time of the request itself is independent of this.
    2. Getting an ok which confirms the request is ongoing is more important than responding with the request complete.
  2. Resilient: It must respond well and support Back-Pressure
    1. Use controller Messages
    2. Avoid catastrophic failure
  3. Elastic: Automatic Generation of resources
    1. Threads
    2. Memory
    3. CPU
  4. Message Driven: Publisher/Subscriber

Observer Pattern

Is a behavioral design pattern that lets you define a subscription mechanism to notify multiple objects about any events that happen to the object they’re observing.

In other words, we are going to build software with declarative programming instead of imperative programming.


Spring Webflux

Reactive programming basically means that we are programming in a reactive way.
A reactive way, means that calls are not blocked and waiting is something we want to avoid.

From the Mono documentation we get the following:

A Mono is a specialized Publisher that emits at most one item and then (optionally) terminates with an onComplete signal or an onError signal.

in Mono documentation

This means that, and we'll see this later in cation, that Monos are valid for single elements.
This is important to keep in mind because we also want to make sure we can, in some way a stream of elements.
In our example we'll use this to transmit one single Cat at a time.

From the Flux documentaion we have that:

A Flux is a standard Publisher that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

in Flux documentaion

In other words this means that we'll use Flux to transmit a list of items.
In our case, a list of CatDtos.


2. Environment Setup

The project we will be looking at is developed in Java 14.

If you have questions about setting up Java 14, then please lookup more information on my Hints&Tricks document.

To be able o run the unit tests for the Kitten Care Project we need a special configuration.
Without it, our Blockhound release version 1.0.3-RELEASE will give this error back:

/Users/jofisaes/.sdkman/candidates/java/14.0.0.hs-adpt/bin/java -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/lib/idea_rt.jar=52479:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/lib/idea_rt.jar:/Users/jofisaes/.m2/repository/org/junit/platform/junit-platform-launcher/1.5.2/junit-platform-launcher-1.5.2.jar:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/plugins/junit/lib/junit5-rt.jar:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/plugins/junit/lib/junit-rt.jar:/Users/jofisaes/dev/src/jofisaes/kitten-house-care-parent/kitten-house-care/target/test-classes:/Users/jofisaes/dev/src/jofisaes/kitten-house-care-parent/kitten-house-care/target/classes:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-webflux/2.2.6.RELEASE/spring-boot-starter-webflux-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter/2.2.6.RELEASE/spring-boot-starter-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot/2.2.6.RELEASE/spring-boot-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.2.6.RELEASE/spring-boot-autoconfigure-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.2.6.RELEASE/spring-boot-starter-logging-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/Users/jofisaes/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/Users/jofisaes/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.12.1/log4j-to-slf4j-2.12.1.jar:/Users/jofisaes/.m2/repository/org/apache/logging/log4j/log4j-api/2.12.1/log4j-api-2.12.1.jar:/Users/jofisaes/.m2/repository/org/slf4j/jul-to-slf4j/1.7.30/jul-to-slf4j-1.7.30.jar:/Users/jofisaes/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/Users/jofisaes/.m2/repository/org/yaml/snakeyaml/1.25/snakeyaml-1.25.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.2.6.RELEASE/spring-boot-starter-json-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.10.3/jackson-databind-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.10.3/jackson-annotations-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.10.3/jackson-core-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.3/jackson-datatype-jdk8-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.10.3/jackson-datatype-jsr310-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.10.3/jackson-module-parameter-names-2.10.3.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-reactor-netty/2.2.6.RELEASE/spring-boot-starter-reactor-netty-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/io/projectreactor/netty/reactor-netty/0.9.6.RELEASE/reactor-netty-0.9.6.RELEASE.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec-http/4.1.48.Final/netty-codec-http-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-common/4.1.48.Final/netty-common-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-buffer/4.1.48.Final/netty-buffer-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-transport/4.1.48.Final/netty-transport-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec/4.1.48.Final/netty-codec-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec-http2/4.1.48.Final/netty-codec-http2-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-handler/4.1.48.Final/netty-handler-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-resolver/4.1.48.Final/netty-resolver-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-handler-proxy/4.1.48.Final/netty-handler-proxy-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec-socks/4.1.48.Final/netty-codec-socks-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-transport-native-epoll/4.1.48.Final/netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:/Users/jofisaes/.m2/repository/io/netty/netty-transport-native-unix-common/4.1.48.Final/netty-transport-native-unix-common-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/org/glassfish/jakarta.el/3.0.3/jakarta.el-3.0.3.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-validation/2.2.6.RELEASE/spring-boot-starter-validation-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/jakarta/validation/jakarta.validation-api/2.0.2/jakarta.validation-api-2.0.2.jar:/Users/jofisaes/.m2/repository/org/hibernate/validator/hibernate-validator/6.0.18.Final/hibernate-validator-6.0.18.Final.jar:/Users/jofisaes/.m2/repository/org/jboss/logging/jboss-logging/3.4.1.Final/jboss-logging-3.4.1.Final.jar:/Users/jofisaes/.m2/repository/com/fasterxml/classmate/1.5.1/classmate-1.5.1.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-web/5.2.5.RELEASE/spring-web-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-beans/5.2.5.RELEASE/spring-beans-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-webflux/5.2.5.RELEASE/spring-webflux-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/io/projectreactor/reactor-core/3.3.4.RELEASE/reactor-core-3.3.4.RELEASE.jar:/Users/jofisaes/.m2/repository/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar:/Users/jofisaes/.m2/repository/org/synchronoss/cloud/nio-multipart-parser/1.1.0/nio-multipart-parser-1.1.0.jar:/Users/jofisaes/.m2/repository/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar:/Users/jofisaes/.m2/repository/org/synchronoss/cloud/nio-stream-storage/1.1.3/nio-stream-storage-1.1.3.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.2.6.RELEASE/spring-boot-starter-web-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.2.6.RELEASE/spring-boot-starter-tomcat-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.33/tomcat-embed-core-9.0.33.jar:/Users/jofisaes/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.33/tomcat-embed-el-9.0.33.jar:/Users/jofisaes/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.33/tomcat-embed-websocket-9.0.33.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-webmvc/5.2.5.RELEASE/spring-webmvc-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-aop/5.2.5.RELEASE/spring-aop-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-context/5.2.5.RELEASE/spring-context-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-expression/5.2.5.RELEASE/spring-expression-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/projectlombok/lombok/1.18.12/lombok-1.18.12.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-test/2.2.6.RELEASE/spring-boot-starter-test-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-test/2.2.6.RELEASE/spring-boot-test-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-test-autoconfigure/2.2.6.RELEASE/spring-boot-test-autoconfigure-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/com/jayway/jsonpath/json-path/2.4.0/json-path-2.4.0.jar:/Users/jofisaes/.m2/repository/net/minidev/json-smart/2.3/json-smart-2.3.jar:/Users/jofisaes/.m2/repository/net/minidev/accessors-smart/1.2/accessors-smart-1.2.jar:/Users/jofisaes/.m2/repository/org/ow2/asm/asm/5.0.4/asm-5.0.4.jar:/Users/jofisaes/.m2/repository/jakarta/xml/bind/jakarta.xml.bind-api/2.3.3/jakarta.xml.bind-api-2.3.3.jar:/Users/jofisaes/.m2/repository/jakarta/activation/jakarta.activation-api/1.2.2/jakarta.activation-api-1.2.2.jar:/Users/jofisaes/.m2/repository/org/junit/vintage/junit-vintage-engine/5.5.2/junit-vintage-engine-5.5.2.jar:/Users/jofisaes/.m2/repository/junit/junit/4.12/junit-4.12.jar:/Users/jofisaes/.m2/repository/org/mockito/mockito-junit-jupiter/3.1.0/mockito-junit-jupiter-3.1.0.jar:/Users/jofisaes/.m2/repository/org/hamcrest/hamcrest/2.1/hamcrest-2.1.jar:/Users/jofisaes/.m2/repository/org/mockito/mockito-core/3.1.0/mockito-core-3.1.0.jar:/Users/jofisaes/.m2/repository/net/bytebuddy/byte-buddy/1.10.8/byte-buddy-1.10.8.jar:/Users/jofisaes/.m2/repository/net/bytebuddy/byte-buddy-agent/1.10.8/byte-buddy-agent-1.10.8.jar:/Users/jofisaes/.m2/repository/org/objenesis/objenesis/2.6/objenesis-2.6.jar:/Users/jofisaes/.m2/repository/org/skyscreamer/jsonassert/1.5.0/jsonassert-1.5.0.jar:/Users/jofisaes/.m2/repository/com/vaadin/external/google/android-json/0.0.20131108.vaadin1/android-json-0.0.20131108.vaadin1.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-core/5.2.5.RELEASE/spring-core-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-jcl/5.2.5.RELEASE/spring-jcl-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-test/5.2.5.RELEASE/spring-test-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/xmlunit/xmlunit-core/2.6.4/xmlunit-core-2.6.4.jar:/Users/jofisaes/.m2/repository/io/projectreactor/tools/blockhound-junit-platform/1.0.3.RELEASE/blockhound-junit-platform-1.0.3.RELEASE.jar:/Users/jofisaes/.m2/repository/io/projectreactor/tools/blockhound/1.0.3.RELEASE/blockhound-1.0.3.RELEASE.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter/5.5.2/junit-jupiter-5.5.2.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter-api/5.5.2/junit-jupiter-api-5.5.2.jar:/Users/jofisaes/.m2/repository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar:/Users/jofisaes/.m2/repository/org/junit/platform/junit-platform-commons/1.5.2/junit-platform-commons-1.5.2.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter-params/5.5.2/junit-jupiter-params-5.5.2.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter-engine/5.5.2/junit-jupiter-engine-5.5.2.jar:/Users/jofisaes/.m2/repository/org/apiguardian/apiguardian-api/1.1.0/apiguardian-api-1.1.0.jar:/Users/jofisaes/.m2/repository/org/junit/platform/junit-platform-engine/1.5.2/junit-platform-engine-1.5.2.jar:/Users/jofisaes/.m2/repository/org/assertj/assertj-core/3.13.2/assertj-core-3.13.2.jar com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit5 org.jesperancinha.housing.controller.CatControllerImplTest,getFullAllCatsNonReacive_whenCall_Blocking
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
Exception in thread "main" java.util.ServiceConfigurationError: org.junit.platform.launcher.TestExecutionListener: Provider reactor.blockhound.junit.platform.BlockHoundTestExecutionListener could not be instantiated
    at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:584)
    at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:806)
    at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:724)
    at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1396)
    at java.base/java.lang.Iterable.forEach(Iterable.java:74)
    at org.junit.platform.launcher.core.LauncherFactory.create(LauncherFactory.java:94)
    at org.junit.platform.launcher.core.LauncherFactory.create(LauncherFactory.java:67)
    at com.intellij.junit5.JUnit5IdeaTestRunner.createListeners(JUnit5IdeaTestRunner.java:46)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:31)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.lang.ExceptionInInitializerError
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
    at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:782)
    ... 9 more
Caused by: java.lang.IllegalStateException: The instrumentation have failed.
It looks like you're running on JDK 13+.
You need to add '-XX:+AllowRedefinitionToAddDeleteMethods' JVM flag.
See https://github.com/reactor/BlockHound/issues/33 for more info.
    at reactor.blockhound.BlockHound$Builder.testInstrumentation(BlockHound.java:435)
    at reactor.blockhound.BlockHound$Builder.install(BlockHound.java:401)
    at reactor.blockhound.BlockHound.install(BlockHound.java:94)
    at reactor.blockhound.junit.platform.BlockHoundTestExecutionListener.<clinit>(BlockHoundTestExecutionListener.java:19)
    ... 15 more

Process finished with exit code 1
Enter fullscreen mode Exit fullscreen mode

As it mentions in the exception we only need to add this flag in our IntelliJ (or whatever IDE you are using, and of course this is also valid via de command line): -XX:+AllowRedefinitionToAddDeleteMethods.

Bear in mind a few important things about Blockhound:

  1. It is still a test tool in the making.
  2. Version 1.0.3-RELEASE work very well with Java 14
  3. Blockhound will detect blocking calls which makes simulation of pausing difficult because methods like wait and Thread.sleep will block the main Thread.
  4. Blocking of the main thread always results in a blocking thread detection. Making a unit test to check the response data will fail.
  5. During a simple Blockhoud test, the simulated blocking calls outside the callback scope aren't going to be detected.

3. Implementation

To implement with WebFlux, we need to start with dependencies.
In this case we are going to use Maven:

We need of course the WebFlux library:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Then we need the framework to run our application and get support for our MVC pattern:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Having these dependencies in our maven repository configuration is what's important.
In our case we are using the maven compile plugin in order to compile our project with the JDK 14:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>14</source>
                <target>14</target>
            </configuration>
        </plugin>
    </plugins>
</build>
Enter fullscreen mode Exit fullscreen mode

Let's take a top to bottom strategy.
First we implement our REST services.
This is all implement in our Controller class CatController.
Our application will follow the typical MVC model, but instead of the traditional MVC way, we will implement it in a reactive way.

First we need a data model and for that let's have a look at our first kitten:

{
  "id": 1,
  "name": "Lania",
  "color": "orange",
  "species": "Katachtigen",
  "pattern": "stripped",
  "age": 4,
  "formerOwners": [
    1
  ],
  "careCenters": [
    1
  ]
}
Enter fullscreen mode Exit fullscreen mode

What we have here, is the name "Lania" for this cat.
The way to interpret this data is:

  1. We have a cat named Mountain.
  2. The cats color is orange.
  3. The cat species is Katachtigen.
  4. The hair pattern is stripped.
  5. The cat is 4 years old
  6. The cat never had any owners.
{
  "id": 2,
  "name": "Mit",
  "color": "black and white",
  "species": "Katachtigen",
  "pattern": "spotted",
  "age": 9,
  "formerOwners": [
    2
  ],
  "careCenters": [
    1
  ]
}
Enter fullscreen mode Exit fullscreen mode

And this is another cat called Mit. Mat is another cat who differs in that:

  1. Mat is 9 years old
  2. He is Black and White
  3. The pattern on his hair is spotted.

We also need the data of the care center for the cats.
This is one example.
This a list of centers.
In our case, there is only one element in it:

[
  {
    "id": 1,
    "name": "Nieuwegein Kitten Center",
    "address": "Kittenstraat",
    "refNumber": "23ABC",
    "city": "Nieuwegein",
    "postCode": "9999CC",
    "country": "Nederland"
  }
]
Enter fullscreen mode Exit fullscreen mode

Finally, we want the information of potential owners.
We make also an array of available owners:

[
  {
    "id": 1,
    "name": "Eng. João Esperancinha",
    "address": "Verrekijkers",
    "rating": 10
  },
  {
    "id": 2,
    "name": "Eng. Erwin Zieligofski",
    "address": "The swamp",
    "rating": 0
  }
]
Enter fullscreen mode Exit fullscreen mode

Here we have two owners.
Top keep it simple, we are only interested in three properties for these owners.
These are their name, adress and rating.
So, we can see here that the best owner is of course João Esperancinha and the bad owner is Erwin Zieligovsky.

Now that we have some more understanding of the data we are going to need, we can now have a look at the REST implementation.
This is an interface class to our REST controller:

@RestController
@RequestMapping("/cats")
public interface CatController {
    @GetMapping
    Flux<CatDto> getAllCats();
    @GetMapping("/full")
    Flux<CatDto> getFullAllCats();
    @GetMapping("/{catId}")
    Mono<CatDto> getCatByIdI(@PathVariable Long catId);
    @GetMapping("/full/{catId}")
    Mono<CatDto> getFullCatById(@PathVariable Long catId);
}
Enter fullscreen mode Exit fullscreen mode

We will read data from our cats database via path /cats.
Here we have the two publisher types available in the Spring Webflux library.
These are Mono and Flux.

We'll look into Mono and Flux in detail letter.
We are now only concerned in getting the architecture all together.

For our application, we just need to know that we want to get a complete list of available cats, a cat by id and these last two methods with complete data.

Let's now have a look at the REST service implementation interface:

@RestController
@RequestMapping("/cats")
public class CatControllerImpl implements CatController {
    private final CatService catService;
    public CatControllerImpl(CatService catService) {
        this.catService = catService;
    }
    public Flux<CatDto> getAllCats() {
        return this.catService.getAllCats();
    }
    public Flux<CatDto> getFullAllCats() {
        return this.catService.getFullAllCats();
    }
    public Mono<CatDto> getCatByIdI(Long catId) {
        return this.catService.getCatById(catId);
    }
    public Mono<CatDto> getFullCatById(Long catId) {
        return this.catService.getFullCatById(catId);
    }
}
Enter fullscreen mode Exit fullscreen mode

The implementation of the RestController doesn't differ that much from the implementation of a common MVC REST controller.
We can see that the difference lies in the fact that instead of returning something like a ResponseEntity, we are instead return our publishers.
All of our REST controller methods are calling methods of the CatService. The CatService is where our business logic lies.
For the current application, the business logic isn't really that much and works more like a mapper between the DTO's and the data objects.

Our CatService interface is very simple:

public interface CatService {
    Mono<CatDto> getCatById(Long id);
    Mono<CatDto> getFullCatById(Long id);
    Flux<CatDto> getFullAllCats();
    Flux<CatDto> getAllCats();
}
Enter fullscreen mode Exit fullscreen mode

Its implementation is only a bit more complictated:

@Service
public class CatServiceImpl implements CatService {

    private final CatRepository catRepository;

    private final OwnerRepository ownerRepository;

    private final CareCenterRepository careCenterRepository;

    public CatServiceImpl(CatRepository catRepository, OwnerRepository ownerRepository,
        CareCenterRepository careCenterRepository) {
        this.catRepository = catRepository;
        this.ownerRepository = ownerRepository;
        this.careCenterRepository = careCenterRepository;
    }

    @Override
    public Mono<CatDto> getCatById(Long id) {
        return catRepository.getCatById(id).map(CatConverter::toDto);
    }

    @Override
    public Mono<CatDto> getFullCatById(Long id) {
        return catRepository.getCatById(id).map(cat -> {
            CatDto catDto = CatConverter.toDto(cat);
            return Mono.zip(ownerRepository.getOwnersByIds(cat.getFormerOwners())
                .map(owners -> {
                    owners.forEach(owner -> catDto.getFormerOwners().add(OwnerConverter.toDto(owner)));
                    return owners;
                })
                .subscribeOn(Schedulers.parallel()), careCenterRepository.getCareCentersByIds(cat.getCareCenters())
                .map(careCenters -> {
                    careCenters
                        .forEach(careCenter -> catDto.getCareCenters().add(CareCenterConverter.toDto(careCenter)));
                    return careCenters;
                })
                .subscribeOn(Schedulers.parallel()), (owners, cares) -> catDto)
                .subscribeOn(Schedulers.parallel());
        }).flatMap(Mono::from).subscribeOn(Schedulers.parallel());
    }

    @Override
    public Flux<CatDto> getFullAllCats() {
        return Flux.merge(getFullCatById(1L), getFullCatById(2L));
    }

    @Override
    public Flux<CatDto> getAllCats() {
        return Flux.merge(getCatById(1L), getCatById(2L));
    }
}
Enter fullscreen mode Exit fullscreen mode

On this occasion, the complexity of our implementations has increased considerably.
The reason being is that our REST controller is dependent of a few separate database entities.
It is important to remind ourselves that we don't actually have a database.
The database is being purely emulated by the use of the file system with static data.
This is also the reason why some id's have been hardcoded.

Before looking at what the CatService can give us. let's look at all the repositories we are going to use.
Cats will be managed via CareCenters and will have a sort of legal guardian registered as an Owner.

For the care centers we first define our interface:

public interface CareCenterRepository {
    Mono<CareCenter> getCareCenterById(Long id);
    Mono<List<CareCenter>> getCareCentersByIds(List<Long> careCenters);
}
Enter fullscreen mode Exit fullscreen mode

And then we have our implementation:

@Repository
public class CareCenterRepositoryImpl implements CareCenterRepository {

    private final CareCenter[] careCenters;

    public CareCenterRepositoryImpl(ObjectMapper objectMapper) throws IOException {
        this.careCenters = objectMapper
            .readValue(getClass().getResourceAsStream("/carecenters.json"), CareCenter[].class);

    }

    @Override
    public Mono<CareCenter> getCareCenterById(Long id) {
        return Mono.fromCallable(
            () -> Arrays.stream(careCenters).filter(careCenter -> careCenter.getId().equals(id)).findFirst()
                .orElse(null));
    }

    @Override
    public Mono<List<CareCenter>> getCareCentersByIds(List<Long> careCenteIds) {
        return Mono.fromCallable(
            () -> Arrays.stream(careCenters).filter(careCenter -> careCenteIds.contains(careCenter.getId()))
                .collect(Collectors.toList()));
    }
}
Enter fullscreen mode Exit fullscreen mode

We create our repository just like all the other repositories.
On a first step we initialize our cached data by reading the data files in resources.
Then we are left with two method: getCareCenterById and getCareCentersByIds.

We have mentioned before that Flux stands for lists.
Our getCareCentersByIds doesn't return a Flux though.
Instead, it returns a Mono of a List of care centers.
This is important to understand.
We could have had returned it this way:

@Override
public Flux<CareCenter> getCareCentersByIds(List<Long> careCenteIds) {
    return Flux.fromStream(
        () -> Arrays.stream(careCenters).filter(careCenter -> careCenteIds.contains(careCenter.getId())));
}
Enter fullscreen mode Exit fullscreen mode

This way we would have gotten a Flux of CareCenter's.
We will have this more clear further down this article.
For the moment, we just need to know that a Flux works pretty much as a stream, which, dependeding on what to do, can make some zip operations much more complicated than they should be.
Furthermore, it does not add any performance value some of these cases and in others, the zip operation is just impossible.

Let's move on to our Repository interface:

public interface OwnerRepository {
    Mono<Owner> getOwnerById(Long id);
    Mono<List<Owner>> getOwnersByIds(List<Long> formerOwners);
}
Enter fullscreen mode Exit fullscreen mode

For the same reasons we said for the CareCenterRepository, we are using a Mono of a List here instead of a Flux.
This is its implementation:

@Repository
public class OwnerRepositoryImpl implements OwnerRepository {
    private final Owner[] owners;
    public OwnerRepositoryImpl(ObjectMapper objectMapper) throws IOException {
        this.owners = objectMapper.readValue(getClass().getResourceAsStream("/owners.json"), Owner[].class);
    }
    @Override
    public Mono<Owner> getOwnerById(Long id) {
        return Mono.fromCallable(
            () -> Arrays.stream(owners).filter(owner -> owner.getId().equals(id)).findFirst().orElse(null));
    }
    @Override
    public Mono<List<Owner>> getOwnersByIds(List<Long> formerOwners) {
        return Mono.fromCallable(() -> Arrays.stream(owners).filter(owner -> formerOwners.contains(owner.getId())).collect(
            Collectors.toList()));
    }
}
Enter fullscreen mode Exit fullscreen mode

Finally we need top just have a quick look at the CatRepository:

public interface CatRepository {
    Mono<Cat> getCatById(Long id);
}
Enter fullscreen mode Exit fullscreen mode

And its implementation:


@Repository
public class CatRepositoryImpl implements CatRepository {
    private final Cat cat1;
    private final Cat cat2;
    public CatRepositoryImpl(ObjectMapper objectMapper) throws IOException {
        this.cat1 = objectMapper.readValue(getClass().getResourceAsStream("/cat1.json"), Cat.class);
        this.cat2 = objectMapper.readValue(getClass().getResourceAsStream("/cat2.json"), Cat.class);
    }
    @Override
    public Mono<Cat> getCatById(Long id) {
        return Mono.fromCallable(() -> {
            if (id.intValue() == 1L) {
                return cat1;
            }
            if (id.intValue() == 2L) {
                return cat2;
            }
            return null;
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

This is an extremely hardcoded repository, which is done so, just to support our tests and experiments.
Our repository implementation provides only one method: getCatById.
With this method we can get the cats we want by id.

Looking back at the CatServiceImpl we now know enough to start analysing this class.

Our service provides methods that are important to unerstand WebFlux.
We can start by checking method getCatById:

@Override
public Mono<CatDto> getCatById(Long id) {
    return catRepository.getCatById(id).map(CatConverter::toDto);
}
Enter fullscreen mode Exit fullscreen mode

What we are doing in this method is just making sure that we separate the back-end layer from the layer facing the front-end.
Just as in tradition MVC architectures, we separate our DTO (Data Transfer Object) from the persistence layer.
In our specific case, the conversion seems poinless, but this is because we are not transforming the data that much.
Normally, for more complicated cases, the data streamed to the public domain is usually either filtered, enriched, or in any other way changed.

For this case, we can clearly see that we are getting a Cat.
At least we will be getting a Cat in the front end.
However, this isn't exactly the case.
Although we eventually get a Cat in the front end, our Cat is going to be published to the front-end.
We'll see this in further detail along this article.

For now let's have a better look at what's happening in the back-end in our repoitory layer:

@Override
public Mono<Cat> getCatById(Long id) {
    return Mono.fromCallable(() -> {
        if (id.intValue() == 1L) {
            return cat1;
        }
        if (id.intValue() == 2L) {
            return cat2;
        }
        return null;
    });
}
Enter fullscreen mode Exit fullscreen mode

We are returning a Mono of a cat and not the cat iself.
What this means is that, we are returning an implementation of a publishing function in the callback of the REST call.
By doing this, we are ensuring that, this hardcoded if, else statements are only executed after the callback.

We are now going to have a look at a zip method provided by Flux:

@Override
public Mono<CatDto> getFullCatById(Long id) {
    return catRepository.getCatById(id).map(cat -> {
        CatDto catDto = CatConverter.toDto(cat);
        return Mono.zip(ownerRepository.getOwnersByIds(cat.getFormerOwners())
            .map(owners -> {
                owners.forEach(owner -> catDto.getFormerOwners().add(OwnerConverter.toDto(owner)));
                return owners;
            })
            .subscribeOn(Schedulers.parallel()), careCenterRepository.getCareCentersByIds(cat.getCareCenters())
            .map(careCenters -> {
                careCenters
                    .forEach(careCenter -> catDto.getCareCenters().add(CareCenterConverter.toDto(careCenter)));
                return careCenters;
            })
            .subscribeOn(Schedulers.parallel()), (owners, cares) -> catDto)
            .subscribeOn(Schedulers.parallel());
    }).flatMap(Mono::from).subscribeOn(Schedulers.parallel());
}
Enter fullscreen mode Exit fullscreen mode

This is method getFullCatById. We are getting a full cat!
A full cat in this code means that we'll get a cat by doing three essential requests in a reactive way:

  1. Collect the Cat data
  2. Collect the Owner data
  3. Collect the CatCenter data

Points 2. and 3. need to be run in a parallel way.
However both of them depend on the Cat data which we should get first.

This is how we want to do this:

blogcenter

The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
Both web frameworks mirror the names of their source modules (spring-webmvc and spring-webflux) and co-exist side by side in the Spring Framework. Each module is optional. Applications can use one or the other module or, in some cases, both — for example, Spring MVC controllers with the reactive WebClient.

in Web on Reactive Stack

The zip method from Mono, allows to start two publishing callbacks separately in parallel.
For this to happen we can use many types in many different cases.
For our case we are going to zip two Mono's together.

The first step is to get our Cat out of the database.
Using instruction catRepository.getCatById(id).map(cat -> { we get our cat from the database.
Recalling from the JSON data we are using for our database, we know that out cats only have references to their owners and care centers.
This is exactly what in a real life ER model database happens.
In our example, for some reason, this data is allocated in different systems and we cannot benefit from things like @ManyToOne, @ManyToMany or @OneToMany.
These decorators are only available via Hibernate and they work very well in single databases, but we cannot used them if we consider a multi tiered system.
Now that we have our Cat, we can now benefit from the zip method.

For this we call return Mono.zip(ownerRepository.getOwnersByIds(cat.getFormerOwners()).
Now we have to add two Monos as parameters.
These two Mono's plus the Mono where they are being called from, form a composed Mono, which can be handle a a single Mono publisher as a returned instance.

Our first Mono is this:

ownerRepository.getOwnersByIds(cat.getFormerOwners())
.map(owners -> {
    owners.forEach(owner -> catDto.getFormerOwners().add(OwnerConverter.toDto(owner)));
    return owners;
})
.subscribeOn(Schedulers.parallel())
Enter fullscreen mode Exit fullscreen mode

In this method we are letting the running thread know that we are going to get all the Owner's and create DTO's with them.
We are also letting it know that when Netty subscribes to them, it will subscribe on parallel Schedulers.
This is what thee Schedulers.parallel() instruction is for.
We add all the Owner's that we find to our catDto we want to return.

The second Mono follows that same pattern and we use it to get our CareCenter's.

careCenterRepository.getCareCentersByIds(cat.getCareCenters())
.map(careCenters -> {
    careCenters
        .forEach(careCenter -> catDto.getCareCenters().add(CareCenterConverter.toDto(careCenter)));
    return careCenters;
})
.subscribeOn(Schedulers.parallel())
Enter fullscreen mode Exit fullscreen mode

We add all the CareCenter's that we find to our catDto we want to return.

Finally we instruct the result Mono to also run in parallel:

.subscribeOn(Schedulers.parallel()), (owners, cares) -> catDto).subscribeOn(Schedulers.parallel());
Enter fullscreen mode Exit fullscreen mode

Finally, maybe you have noticed it or maybe not, but we would be actually returning a Mono<Mono<CatDto> publisher instead of what we want.
This is because of our zip call that is running within yet another Mono.

Webflux has a very interesting way of dealing with this and that is running the:

 }).flatMap(Mono::from).subscribeOn(Schedulers.parallel());
Enter fullscreen mode Exit fullscreen mode

By using flatMap, we are making sure that our Mono<Mono<CatDto> is optimized and converted to a Mono<Mono<CatDto instance.

At the end of our CatServiceImpl we still have the following two methods:

@Override
public Flux<CatDto> getFullAllCats() {
    return Flux.merge(getFullCatById(1L), getFullCatById(2L));
}

@Override
public Flux<CatDto> getAllCats() {
    return Flux.merge(getCatById(1L), getCatById(2L));
}
Enter fullscreen mode Exit fullscreen mode

This is an illustration of a possible use of the method merge of the Flux publisher.
This method receives an array in the form of a varargs argument.
This way, we are allowed to pass several Mono's to this method.
The result is a Flux.
On this occasion we definitelly make use of the Flux in its most common form which is as a stream of different objects.


4. REST endpoints test

Let's have a quick run over what we have studyied so far.


4.1 Returning a simple Mono

We have seen this in method getCatById of the CatServiceImpl:

curl http://localhost:8080/cats/1
Enter fullscreen mode Exit fullscreen mode
{
  "name": "Lania",
  "color": "orange",
  "species": "Katachtigen",
  "pattern": "stripped",
  "age": 4,
  "formerOwners": [],
  "careCenters": []
}
Enter fullscreen mode Exit fullscreen mode

Let's try for another cat:

curl http://localhost:8080/cats/1
Enter fullscreen mode Exit fullscreen mode
{
  "name": "Mit",
  "color": "black and white",
  "species": "Katachtigen",
  "pattern": "spotted",
  "age": 9,
  "formerOwners": [],
  "careCenters": []
}
Enter fullscreen mode Exit fullscreen mode

4.2 Returning the complete cat info

Let's now see what our full method, getFullCatById returns for Cat 1:

curl http://localhost:8080/cats/full/1
Enter fullscreen mode Exit fullscreen mode
{
  "name": "Lania",
  "color": "orange",
  "species": "Katachtigen",
  "pattern": "stripped",
  "age": 4,
  "formerOwners": [
    {
      "name": "Eng. João Esperancinha",
      "addres": "Verrekijkers"
    }
  ],
  "careCenters": [
    {
      "name": "Nieuwegein Kitten Center",
      "address": "Kittenstraat",
      "refNumber": "23ABC",
      "city": "Nieuwegein",
      "postCode": "9999CC",
      "country": "Nederland"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

And now for cat 2:

curl http://localhost:8080/cats/full/2
Enter fullscreen mode Exit fullscreen mode
{
  "name": "Mit",
  "color": "black and white",
  "species": "Katachtigen",
  "pattern": "spotted",
  "age": 9,
  "formerOwners": [
    {
      "name": "Eng. Erwin Zieligofski",
      "addres": "The swamp"
    }
  ],
  "careCenters": [
    {
      "name": "Nieuwegein Kitten Center",
      "address": "Kittenstraat",
      "refNumber": "23ABC",
      "city": "Nieuwegein",
      "postCode": "9999CC",
      "country": "Nederland"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

4.3 All cats

First we look at the simple way of getting the Cat's info using method getAllCats:

curl http://localhost:8080/cats 
Enter fullscreen mode Exit fullscreen mode
[
  {
    "name": "Lania",
    "color": "orange",
    "species": "Katachtigen",
    "pattern": "stripped",
    "age": 4,
    "formerOwners": [],
    "careCenters": []
  },
  {
    "name": "Mit",
    "color": "black and white",
    "species": "Katachtigen",
    "pattern": "spotted",
    "age": 9,
    "formerOwners": [],
    "careCenters": []
  }
]
Enter fullscreen mode Exit fullscreen mode

We get no info about the owners, nor do we get info about the care centers.
Now let's see what happend with getFullAllCats:

curl http://localhost:8080/cats/full  
Enter fullscreen mode Exit fullscreen mode
[
  {
    "name": "Mit",
    "color": "black and white",
    "species": "Katachtigen",
    "pattern": "spotted",
    "age": 9,
    "formerOwners": [
      {
        "name": "Eng. Erwin Zieligofski",
        "addres": "The swamp"
      }
    ],
    "careCenters": [
      {
        "name": "Nieuwegein Kitten Center",
        "address": "Kittenstraat",
        "refNumber": "23ABC",
        "city": "Nieuwegein",
        "postCode": "9999CC",
        "country": "Nederland"
      }
    ]
  },
  {
    "name": "Lania",
    "color": "orange",
    "species": "Katachtigen",
    "pattern": "stripped",
    "age": 4,
    "formerOwners": [
      {
        "name": "Eng. João Esperancinha",
        "addres": "Verrekijkers"
      }
    ],
    "careCenters": [
      {
        "name": "Nieuwegein Kitten Center",
        "address": "Kittenstraat",
        "refNumber": "23ABC",
        "city": "Nieuwegein",
        "postCode": "9999CC",
        "country": "Nederland"
      }
    ]
  }
]
Enter fullscreen mode Exit fullscreen mode

5. A forged identity case

Let's picture this case in the form of a story:


Erwin Zieligovsky was a team leader in a production center.
One of the things that Erwin hated the most was being called Edwin.
Do you see the difference? It's small d.
This d would frequently enrage Erwin, up to the point that with the powers given to him, he would be responsible for people getting fired.
All of this because of that "offensive d instead of the "heavenly" r.
The Erwin was also known for having fits and rage attacks, especially around cats.
Erwin hated cats.
The curious thing though is that Erwin loved to hate cats and loved to show superiority even to cats.
The good thing though, is that the Cat care houses knew about Erwin and so he was always denied access when applying to adopt a cat.
One day, when renewing his identity card, he realized that the government had made a mistake.
Erwin was now called Edwin!
It was in his identiy card!
After fits of anger, Erwin thought about one thing.
With the new identity card, he would get another chance of adopting a kitten!
Erwin knew about the Cat care centers and how their software wasn't safe.


Let's see how Erwin tried to do this.
In the OwnerServiceImpl class, there is a new method called checkLiabiliy:

@Override
public Mono<String> checkLiability(String name) {
    return ownerRepository.checkLiability(name);
}
Enter fullscreen mode Exit fullscreen mode

In the repository in class OwnerRepositoryImpl, it becomes more complex:

@Override
public Mono<String> checkLiability(String address) {
    return Flux
        .fromStream(() -> Arrays.stream(owners).filter(owner -> owner.getAddress().equalsIgnoreCase(address)))
        .reduce((owner, owner2) -> {
            if (owner.getRating() > owner2.getRating()) {
                return owner2;
            }
            return owner;
        }).map(owner -> owner.getRating() <= 0 ?
            "NOK" :
            "OK");
}
Enter fullscreen mode Exit fullscreen mode

So what we are doing her is to check if the owner is liable to have a cat.
Since Owner's are registered forever in the system, we can have a look if the addresses match.
The way it works is in that we first find if there are any owners in that address.
Then if there are, we check if their rating has gone lower than 0.
If so, then the owner get's a "NOK". If not, the owner gets an "OK".

We can test this also via unit tests:

class OwnerRepositoryImplTest {
    private final OwnerRepository ownerRepository = new OwnerRepositoryImpl(new ObjectMapper());

    OwnerRepositoryImplTest() throws IOException {
    }

    @Test
    void givenLiable_checkLiability_Ok() {
        Mono<String> verrekijkers = ownerRepository.checkLiability("Verrekijkers");
        final String block = verrekijkers.block();
        assertThat(block).isEqualTo("OK");
    }

    @Test
    void givenNonLiable_checkLiability_NOk() {
        Mono<String> the_swamp = ownerRepository.checkLiability("The Swamp");
        final String block = the_swamp.block();
        assertThat(block).isEqualTo("NOK");
    }
}
Enter fullscreen mode Exit fullscreen mode

The point of this last bit is to see how we can use lambdas to implement publishers with extra functionalities just like Java lambdas already do.

And so contrary to Erwin's expectations, the new identity card having Edwin in it, didn't save the day.
So now, poor Erwin, will have to renew his card and still live frustrated because once again he was denied the right to adopt a cat.


6. Testing with Blockhound

Blockhoud is a library developed quite recently and is available in GitHub

alt text

Blockhound

The developments on GitHub for this library started at the end of 2018.

Blockound uses markers placed on threads to determine if they are blocking or not:

BlockHound will transparently instrument the JVM classes and intercept blocking calls (e.g. IO) if they are performed from threads marked as "non-blocking operations only" (ie. threads implementing Reactor's NonBlocking marker interface, like those started by Schedulers.parallel()). If and when this happens (but remember, this should never happen!😜), an error will be thrown.

in Blockhound @ GitHub

First step to do is to add the necessary libraries to our project.
First we need our test framework for Spring Boot:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <version>2.2.6.RELEASE</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Given that we are running JUnit tests, we need the JUnit support:

<dependency>
    <groupId>io.projectreactor.tools</groupId>
    <artifactId>blockhound-junit-platform</artifactId>
    <version>1.0.2.RELEASE</version>
    <scope>test</scope>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Finally we need the Blockhound core:

<dependency>
    <groupId>io.projectreactor.tools</groupId>
    <artifactId>blockhound</artifactId>
    <version>1.0.2.RELEASE</version>
    <scope>test</scope>
</dependency>
Enter fullscreen mode Exit fullscreen mode

This way we can test all our calls in class CatControllerImplTest:

@SpringBootTest(webEnvironment = RANDOM_PORT)
@ExtendWith(SpringExtension.class)
class CatControllerImplTest {
    final RestTemplate restTemplate = new RestTemplate();
    @Autowired
    private CatControllerImpl catController;
    @LocalServerPort
    private int port;
    static {
        BlockHound.install();
    }
    @Test
    void getCatByIdI_whenCall_nonBlocking() {
        Mono.delay(Duration.ofMillis(1))
                .doOnNext(it -> catController.getCatByIdI(1L))
                .block();
    }
    @Test
    void getFullCatById_whenCall_nonBlocking() {
        Mono.delay(Duration.ofMillis(1))
                .doOnNext(it -> catController.getFullCatById(1L))
                .block();
    }
    @Test
    void getAllCats_whenCall_nonBlocking() {
        Mono.delay(Duration.ofMillis(1))
                .doOnNext(it -> catController.getAllCats())
                .block();
    }
    @Test
    void getFullAllCats_whenCall_nonBlocking() {
        Mono.delay(Duration.ofMillis(1))
                .doOnNext(it -> catController.getFullAllCats())
                .block();
    }
(...)
Enter fullscreen mode Exit fullscreen mode

We need to pay special attention to this section of the code:

static {
    BlockHound.install();
}
Enter fullscreen mode Exit fullscreen mode

This is where Blockhound gets installed in order to be able to run the unit tests.
We can find multiple ways opf configuring this.
For this article we only need Blockhound.install().
Please bear in mind that BlockHound is Java Agent installed on a JVM level.

BlockHound is a Java Agent.
It instruments the pre-defined set of blocking methods (see customization) in the JVM and adds a special check before calling the callback (see Blocking call decision).

in How it works @ Blockhound GitHub repo

All of these @Test's follow the same test procedure.
Taking getFullAllCats_whenCall_nonBlocking as an example:

@Test
void getFullAllCats_whenCall_nonBlocking() {
    Mono.delay(Duration.ofMillis(1))
            .doOnNext(it -> catController.getFullAllCats())
            .block();
}
Enter fullscreen mode Exit fullscreen mode

The point of Duration.ofMillis(1) is just a standard way of configuring Blockhound.
There doesn't seem to be any difference in the unit test runtime, if we change the ofMillis parameter.
What does makes a difference is the call to block().
Once we do this, we will be triggering the test to see if our method is blocking or not.

This same procedure applies to our OwnerControllerTest:

@Test
void checkLiability_whenCallPositive_nonBlocking() {
    Mono.delay(Duration.ofMillis(1))
        .doOnNext(it -> ownerController.checkLiability("Verrekijkers"))
        .block();
}

@Test
void checkLiability_whenCallNegative_nonBlocking() {
    Mono.delay(Duration.ofMillis(1))
        .doOnNext(it -> ownerController.checkLiability("The swamp"))
        .block();
}
Enter fullscreen mode Exit fullscreen mode

These two extra tests check if our checkLiability calls are blocking or not.


We are almost at the end of our article, but before we continue to the conclusion we still need to see the difference.
At the moment it is probably not clear what is the difference in practical terms between making a blocking call and a reactive call.
If we make a unit test with the code described above, we will realize that Blockhound will still not detect a potential blocking method.
We need to test this somehow.
In our example, we are using the file system as our database.
Unfortunatelly for our experiments we run into two problems here:

  1. Accessing the file system doesn't necessarily marks the thread as blocking.
  2. Using Flux or Mono, doesn't automatically make the method non-blocking.
  3. Using wait or Thread.sleep automatically marks the method as blockign regardless of running with WebFlux or not.

However, we do know that Blockhound will not generate errors if we just run the detection procedure and our simulation calls are implemented in the publisher instead of the main callback.

For this, I have created two extra methods in the CatControllerImpl class:

public Flux<CatDto> getFullAllCatsReactiveForTest() {
    return this.catService.getFullAllCats().map(catDto -> {
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return catDto;
    });
}

public List<CatDto> getFullAllCatsNonReactive() {
    try {
        Thread.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return this.catService.getFullAllCatsNonReactive();
}
Enter fullscreen mode Exit fullscreen mode

The first method, getFullAllCatsReactiveForTest, is a reactive method.
We don't make the unit tests to test its result.
Instead we are only going to use Blockhound to see if our method is blocking or not.
The second method, getFullAllCatsNonReactive, is a blocking method.
The Thread.sleep, occurs during the callback scope.
This means that Netty will receive a plain request response, only after waiting for 1 millisecond.

To make it more clear let's look at the unit tests that will help us understand this in class CatControllerImplTest:

    @Test
    void getFullAllCatsNonReaciveForTest_whenCall_Blocking() {
        Mono.delay(Duration.ofMillis(1))
            .doOnNext(it -> catController.getFullAllCatsReactiveForTest())
            .block();
    }
    @Test
    void getFullAllCatsNonReacive_whenCall_Blocking() {
        Mono.delay(Duration.ofMillis(1))
            .doOnNext(it -> catController.getFullAllCatsNonReactive())
            .block();
    }
Enter fullscreen mode Exit fullscreen mode

Now we have the second unit test failling.
This is the Error we should be getting at the moment:

reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep

    at reactor.core.Exceptions.propagate(Exceptions.java:393)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
    at reactor.core.publisher.Mono.block(Mono.java:1666)
    at org.jesperancinha.housing.controller.CatControllerImplTest.getFullAllCatsNonReacive_whenCall_Blocking(CatControllerImplTest.java:77)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:125)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:132)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:124)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:74)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:104)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:62)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:43)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:35)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:202)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:198)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1510)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1510)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:229)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:197)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:191)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        ... 65 more
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep
    at java.base/java.lang.Thread.sleep(Thread.java)
    at org.jesperancinha.housing.controller.CatControllerImpl.getFullAllCatsNonReactive(CatControllerImpl.java:51)
    at org.jesperancinha.housing.controller.CatControllerImplTest.lambda$getFullAllCatsNonReacive_whenCall_Blocking$5(CatControllerImplTest.java:76)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)
Enter fullscreen mode Exit fullscreen mode

The way to fix this unit test is just to cause this test to expect this Error:

@Test
void getFullAllCatsNonReacive_whenCall_Blocking() {
    assertThatExceptionOfType(BlockingOperationError.class).isThrownBy(() -> Mono.delay(Duration.ofMillis(1))
        .doOnNext(it -> catController.getFullAllCatsNonReactive())
        .block());
}
Enter fullscreen mode Exit fullscreen mode

This was our last failing unit test, and this concludes our research into Reactive Programming with WebFlux and testing it with Blockhound


7. Conclusion

This article was written with a lot of positivity to highlight the basic ways in which to use Reactive programming with WebFlux.

All code for this example is available on my repo on GitHub Kitten House Care

I hope that you enjoyed this article as much as I enjoyed writing it.

If you enjoyed it, please share it in the links below.

Take care, stay interested, stay logic, stay safe!

Generic badge

If you are interested in Reactive programming, you'll probably find the following slides very useful:

Reactive Programming With S... by João Esperancinha

or you may find the same presentation on slide-share:

Reactive programming with Spring Webflux.pptx from João Esperancinha

8. Resources

8.1. Books and papers

8.2. Websites


If you enjoyed this presentation, you probably also would like to learn more about reactive programming. I have created a tutorial about best practices on creating a reactive web application with Kotlin coroutines and you can find the 3 episode set over here:

Or you prefer to watch all of them in one go, they are also available in one single video form on DailyMotion for you:

Top comments (3)

Collapse
 
siy profile image
Sergiy Yevtushenko

Reactive streams have several issues, which limit their usability:

  • artificial mental model "everything is a stream"
  • too much low level technical details are leaking into business logic
  • pull model has several issues, from "cold"/"hot" streams difference to susceptibility to memory hogging or loss of events and backpressure tuning

Overall reactive streams are quite complicated and inconvenient, despite significant resources invested into development and tuning of various implementations.

Collapse
 
jofisaes profile image
João Esperancinha

I don't disagree. The real benefit of reactive seems to be resilience. Having said that, even with coroutines, the level of complexity may outweigh the benefits in many cases. It is really a question of trying per case and see if it performs well. Benchmarking and performance tests are crucial to make correct decisions. The concepts of "cold" and "hot" streams may feel complicated as you mention and in many cases, they are. What reactive streams don't do is something that I often see defended and it is that "they are faster!". That's an odd concept. They may perform better or worse than the traditional blocking MVC model, but in any case, the resistance to backpressure should most of the times be higher with reactive streams. Even if it is just a little bit.

Collapse
 
siy profile image
Sergiy Yevtushenko

I think that there should be no complexity barrier and asynchronous processing should be as simple and convenient to use as synchronous code. This is achievable by using functional style Future/Promise API.