At the beginning of 2020, I was trying to make an example to illustrate the benefits of using a Reactive Programming model.
I already had an idea of an application that I wanted to make and because I love cats, I wanted to make an application to register cats found in the wild.
This way cats could be helped by being redirected to a Cat care house in order to find an available carer.
The idea of making the application reactive was to also be able to make a presentation about it. Unfortunately the presentation never took place and I couldn't continue with it.
However, I was able to finish just in time the project I wanted to make.
In this article I will share with you my experience in creating this application.
I find this to be very helpful if you are looking forward to grasping the intricacies of Reactive Programming.
We will together go through how Reactive programming can make our programming models easier and faster.
What we should know is that Reactive programming relies heavily on the observer pattern.
In short, an observer pattern is one where we have two major players:
- A publisher - An entity that manages received events
- A subscriber - An entity that subscribes to the publisher and does something with the data generated by the publisher.
The source code for this project can be found here:
Reactive programming in current times is viewed as one of the best programming paradigms ever.
However, this concept isn't a new concept.
It already exists since the 1960's under a different name: Data Flows.
1. Introduction
1960's
In order to illustrate the way the concept of Reactive programming started, we have to go back to the 1960's in a time capsule.
At that time Jack Dennis and his graduate team at MIT developed concepts surrounding data and how data could be efficiently transmitted.
The concept invented was the concept of Data flows.
Dataflow programming concepts pioneered in this decade because of Jack Dennis inventive character.
The years where this kind of programming gained momentum was in the years 1974 – 1975.
Along these years, the data stream concept and its interactions were further developed.
The data flow ancestors are:
- Asynchronous Digital Logic
- Control Structures for Parallel Programming
- Abstract Models for Concurrent Systems
- Theory of Program Schemes
- Structured Programming
- Functional Programming
What data flows wanted to show is that data can be emitted as a response to a system.
"For every action, there is an equal and opposite reaction."
Newton's third law
What this means is that the idea was also to find a way to work better with a decentralizad system.
Using the traditional sequential, procedural or control paradigms, represented a problem.
This problem was the blocking nature of such paradigms.
The best way solve this was to parallelize, what could be parallelized.
Therefore, the dataflow paradigm finally was settled to be one where serveral black boxes were connected together.
Connections are made by the use of several inputs.
When all the inputs of one black box
are satisfied, then the black box
runs.
In 1961, a paper, called " A Block Diagram Compiler ". was published with the development of a new programming language.
BLODI, the new language, short for BLOck DIagram, was the first programming language following the data flow paradigms.
Its application was in electronic circuits.
It was designed by John Larry Kelly Jr. (1923-1965), Carol Lochbaum and Victor A. Vyssotsky (1931-2012)
A presentation from Jack Dennis has been written for the MIT and is available directly on:
2010
In 2010, Eric Meijer, a Computer Scientist working for Microsoft, coined the term Reactive Programming.
Up until this date Eric Meijer was involved inthe development of different .Net
technologies.
These included C#
, Visual Basic
, LINQ
and Volta
.
The first framework to ever appear which was referred to as Reactive was the Reactive programming framework in 2009.
It was then called a "Watershed Advance in Asynch Programming".
The reason is very simple.
One of the most important paradigms of Reactive programming is its non-blocking nature.
Non-blocking also means that we try to perform separate small tasks independently and asynchronously.
In 2010, the creation of Reactive Extensions for .NET revolutionized the way we look into software development.
Reactive programming, an old concept, was brought to life.
Currently Reactive Extensions for .NET is actually called Reactive X, and it is available in most common programming languages.
Reactive Programming is commonly referred to as Dataflow programming on steroids.
The reason being is that it's just an extension of the Dataflow programming paradigm.
In November 17th of 2009, Erik Meijer was interviewed on Channel 9 about Rx. The video isn't available online anymore.
This interview used to be available online on their website: Erik-Meijer-Rx-in-15-Minutes
Reactive Manifesto
The Reactive Manifesto was created in 2013 by Jonas Bonér.
This manifesto applies to systems and is many times confused with the idea of reactive programmming.
Both reactive programming and reactive systems want decoupling to happen, they want to run as soon as certain conditions are met and they want to resource to parallelism as much as possible.
In other words these two concepts have in common that they want maximum availability and asynchronicity whenever possible.
This is probably where the similarities end.
A reactive system must obey the following rules or at least explore these as maximum as possible:
- Responsive: It needs to respond quickly.
- The time of the request itself is independent of this.
- Getting an ok which confirms the request is ongoing is more important than responding with the request complete.
- Resilient: It must respond well and support Back-Pressure
- Use controller Messages
- Avoid catastrophic failure
- Elastic: Automatic Generation of resources
- Threads
- Memory
- CPU
- Message Driven: Publisher/Subscriber
Observer Pattern
Is a behavioral design pattern that lets you define a subscription mechanism to notify multiple objects about any events that happen to the object they’re observing.
In other words, we are going to build software with declarative programming instead of imperative programming.
Spring Webflux
Reactive programming basically means that we are programming in a reactive way.
A reactive way, means that calls are not blocked and waiting is something we want to avoid.
From the Mono documentation we get the following:
A Mono is a specialized Publisher that emits at most one item and then (optionally) terminates with an onComplete signal or an onError signal.
This means that, and we'll see this later in cation, that Mono
s are valid for single elements.
This is important to keep in mind because we also want to make sure we can, in some way a stream of elements.
In our example we'll use this to transmit one single Cat at a time.
From the Flux documentaion we have that:
A Flux is a standard Publisher that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.
In other words this means that we'll use Flux
to transmit a list of items.
In our case, a list of CatDto
s.
2. Environment Setup
The project we will be looking at is developed in Java 14.
If you have questions about setting up Java 14, then please lookup more information on my Hints&Tricks document.
To be able o run the unit tests for the Kitten Care Project we need a special configuration.
Without it, our Blockhound release version 1.0.3-RELEASE
will give this error back:
/Users/jofisaes/.sdkman/candidates/java/14.0.0.hs-adpt/bin/java -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/lib/idea_rt.jar=52479:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/lib/idea_rt.jar:/Users/jofisaes/.m2/repository/org/junit/platform/junit-platform-launcher/1.5.2/junit-platform-launcher-1.5.2.jar:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/plugins/junit/lib/junit5-rt.jar:/Applications/IntelliJ IDEA 2020.2 EAP.app/Contents/plugins/junit/lib/junit-rt.jar:/Users/jofisaes/dev/src/jofisaes/kitten-house-care-parent/kitten-house-care/target/test-classes:/Users/jofisaes/dev/src/jofisaes/kitten-house-care-parent/kitten-house-care/target/classes:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-webflux/2.2.6.RELEASE/spring-boot-starter-webflux-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter/2.2.6.RELEASE/spring-boot-starter-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot/2.2.6.RELEASE/spring-boot-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.2.6.RELEASE/spring-boot-autoconfigure-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.2.6.RELEASE/spring-boot-starter-logging-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/Users/jofisaes/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/Users/jofisaes/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.12.1/log4j-to-slf4j-2.12.1.jar:/Users/jofisaes/.m2/repository/org/apache/logging/log4j/log4j-api/2.12.1/log4j-api-2.12.1.jar:/Users/jofisaes/.m2/repository/org/slf4j/jul-to-slf4j/1.7.30/jul-to-slf4j-1.7.30.jar:/Users/jofisaes/.m2/repository/jakarta/annotation/jakarta.annotation-api/1.3.5/jakarta.annotation-api-1.3.5.jar:/Users/jofisaes/.m2/repository/org/yaml/snakeyaml/1.25/snakeyaml-1.25.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.2.6.RELEASE/spring-boot-starter-json-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.10.3/jackson-databind-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.10.3/jackson-annotations-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.10.3/jackson-core-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.3/jackson-datatype-jdk8-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.10.3/jackson-datatype-jsr310-2.10.3.jar:/Users/jofisaes/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.10.3/jackson-module-parameter-names-2.10.3.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-reactor-netty/2.2.6.RELEASE/spring-boot-starter-reactor-netty-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/io/projectreactor/netty/reactor-netty/0.9.6.RELEASE/reactor-netty-0.9.6.RELEASE.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec-http/4.1.48.Final/netty-codec-http-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-common/4.1.48.Final/netty-common-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-buffer/4.1.48.Final/netty-buffer-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-transport/4.1.48.Final/netty-transport-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec/4.1.48.Final/netty-codec-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec-http2/4.1.48.Final/netty-codec-http2-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-handler/4.1.48.Final/netty-handler-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-resolver/4.1.48.Final/netty-resolver-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-handler-proxy/4.1.48.Final/netty-handler-proxy-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-codec-socks/4.1.48.Final/netty-codec-socks-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/io/netty/netty-transport-native-epoll/4.1.48.Final/netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:/Users/jofisaes/.m2/repository/io/netty/netty-transport-native-unix-common/4.1.48.Final/netty-transport-native-unix-common-4.1.48.Final.jar:/Users/jofisaes/.m2/repository/org/glassfish/jakarta.el/3.0.3/jakarta.el-3.0.3.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-validation/2.2.6.RELEASE/spring-boot-starter-validation-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/jakarta/validation/jakarta.validation-api/2.0.2/jakarta.validation-api-2.0.2.jar:/Users/jofisaes/.m2/repository/org/hibernate/validator/hibernate-validator/6.0.18.Final/hibernate-validator-6.0.18.Final.jar:/Users/jofisaes/.m2/repository/org/jboss/logging/jboss-logging/3.4.1.Final/jboss-logging-3.4.1.Final.jar:/Users/jofisaes/.m2/repository/com/fasterxml/classmate/1.5.1/classmate-1.5.1.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-web/5.2.5.RELEASE/spring-web-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-beans/5.2.5.RELEASE/spring-beans-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-webflux/5.2.5.RELEASE/spring-webflux-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/io/projectreactor/reactor-core/3.3.4.RELEASE/reactor-core-3.3.4.RELEASE.jar:/Users/jofisaes/.m2/repository/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar:/Users/jofisaes/.m2/repository/org/synchronoss/cloud/nio-multipart-parser/1.1.0/nio-multipart-parser-1.1.0.jar:/Users/jofisaes/.m2/repository/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar:/Users/jofisaes/.m2/repository/org/synchronoss/cloud/nio-stream-storage/1.1.3/nio-stream-storage-1.1.3.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.2.6.RELEASE/spring-boot-starter-web-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.2.6.RELEASE/spring-boot-starter-tomcat-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.33/tomcat-embed-core-9.0.33.jar:/Users/jofisaes/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.33/tomcat-embed-el-9.0.33.jar:/Users/jofisaes/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.33/tomcat-embed-websocket-9.0.33.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-webmvc/5.2.5.RELEASE/spring-webmvc-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-aop/5.2.5.RELEASE/spring-aop-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-context/5.2.5.RELEASE/spring-context-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-expression/5.2.5.RELEASE/spring-expression-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/projectlombok/lombok/1.18.12/lombok-1.18.12.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-starter-test/2.2.6.RELEASE/spring-boot-starter-test-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-test/2.2.6.RELEASE/spring-boot-test-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/boot/spring-boot-test-autoconfigure/2.2.6.RELEASE/spring-boot-test-autoconfigure-2.2.6.RELEASE.jar:/Users/jofisaes/.m2/repository/com/jayway/jsonpath/json-path/2.4.0/json-path-2.4.0.jar:/Users/jofisaes/.m2/repository/net/minidev/json-smart/2.3/json-smart-2.3.jar:/Users/jofisaes/.m2/repository/net/minidev/accessors-smart/1.2/accessors-smart-1.2.jar:/Users/jofisaes/.m2/repository/org/ow2/asm/asm/5.0.4/asm-5.0.4.jar:/Users/jofisaes/.m2/repository/jakarta/xml/bind/jakarta.xml.bind-api/2.3.3/jakarta.xml.bind-api-2.3.3.jar:/Users/jofisaes/.m2/repository/jakarta/activation/jakarta.activation-api/1.2.2/jakarta.activation-api-1.2.2.jar:/Users/jofisaes/.m2/repository/org/junit/vintage/junit-vintage-engine/5.5.2/junit-vintage-engine-5.5.2.jar:/Users/jofisaes/.m2/repository/junit/junit/4.12/junit-4.12.jar:/Users/jofisaes/.m2/repository/org/mockito/mockito-junit-jupiter/3.1.0/mockito-junit-jupiter-3.1.0.jar:/Users/jofisaes/.m2/repository/org/hamcrest/hamcrest/2.1/hamcrest-2.1.jar:/Users/jofisaes/.m2/repository/org/mockito/mockito-core/3.1.0/mockito-core-3.1.0.jar:/Users/jofisaes/.m2/repository/net/bytebuddy/byte-buddy/1.10.8/byte-buddy-1.10.8.jar:/Users/jofisaes/.m2/repository/net/bytebuddy/byte-buddy-agent/1.10.8/byte-buddy-agent-1.10.8.jar:/Users/jofisaes/.m2/repository/org/objenesis/objenesis/2.6/objenesis-2.6.jar:/Users/jofisaes/.m2/repository/org/skyscreamer/jsonassert/1.5.0/jsonassert-1.5.0.jar:/Users/jofisaes/.m2/repository/com/vaadin/external/google/android-json/0.0.20131108.vaadin1/android-json-0.0.20131108.vaadin1.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-core/5.2.5.RELEASE/spring-core-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-jcl/5.2.5.RELEASE/spring-jcl-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/springframework/spring-test/5.2.5.RELEASE/spring-test-5.2.5.RELEASE.jar:/Users/jofisaes/.m2/repository/org/xmlunit/xmlunit-core/2.6.4/xmlunit-core-2.6.4.jar:/Users/jofisaes/.m2/repository/io/projectreactor/tools/blockhound-junit-platform/1.0.3.RELEASE/blockhound-junit-platform-1.0.3.RELEASE.jar:/Users/jofisaes/.m2/repository/io/projectreactor/tools/blockhound/1.0.3.RELEASE/blockhound-1.0.3.RELEASE.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter/5.5.2/junit-jupiter-5.5.2.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter-api/5.5.2/junit-jupiter-api-5.5.2.jar:/Users/jofisaes/.m2/repository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar:/Users/jofisaes/.m2/repository/org/junit/platform/junit-platform-commons/1.5.2/junit-platform-commons-1.5.2.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter-params/5.5.2/junit-jupiter-params-5.5.2.jar:/Users/jofisaes/.m2/repository/org/junit/jupiter/junit-jupiter-engine/5.5.2/junit-jupiter-engine-5.5.2.jar:/Users/jofisaes/.m2/repository/org/apiguardian/apiguardian-api/1.1.0/apiguardian-api-1.1.0.jar:/Users/jofisaes/.m2/repository/org/junit/platform/junit-platform-engine/1.5.2/junit-platform-engine-1.5.2.jar:/Users/jofisaes/.m2/repository/org/assertj/assertj-core/3.13.2/assertj-core-3.13.2.jar com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit5 org.jesperancinha.housing.controller.CatControllerImplTest,getFullAllCatsNonReacive_whenCall_Blocking
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
Exception in thread "main" java.util.ServiceConfigurationError: org.junit.platform.launcher.TestExecutionListener: Provider reactor.blockhound.junit.platform.BlockHoundTestExecutionListener could not be instantiated
at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:584)
at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:806)
at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:724)
at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1396)
at java.base/java.lang.Iterable.forEach(Iterable.java:74)
at org.junit.platform.launcher.core.LauncherFactory.create(LauncherFactory.java:94)
at org.junit.platform.launcher.core.LauncherFactory.create(LauncherFactory.java:67)
at com.intellij.junit5.JUnit5IdeaTestRunner.createListeners(JUnit5IdeaTestRunner.java:46)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:31)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.lang.ExceptionInInitializerError
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:782)
... 9 more
Caused by: java.lang.IllegalStateException: The instrumentation have failed.
It looks like you're running on JDK 13+.
You need to add '-XX:+AllowRedefinitionToAddDeleteMethods' JVM flag.
See https://github.com/reactor/BlockHound/issues/33 for more info.
at reactor.blockhound.BlockHound$Builder.testInstrumentation(BlockHound.java:435)
at reactor.blockhound.BlockHound$Builder.install(BlockHound.java:401)
at reactor.blockhound.BlockHound.install(BlockHound.java:94)
at reactor.blockhound.junit.platform.BlockHoundTestExecutionListener.<clinit>(BlockHoundTestExecutionListener.java:19)
... 15 more
Process finished with exit code 1
As it mentions in the exception we only need to add this flag in our IntelliJ (or whatever IDE you are using, and of course this is also valid via de command line): -XX:+AllowRedefinitionToAddDeleteMethods
.
Bear in mind a few important things about Blockhound:
- It is still a test tool in the making.
- Version 1.0.3-RELEASE work very well with Java 14
- Blockhound will detect blocking calls which makes simulation of pausing difficult because methods like
wait
andThread.sleep
will block the main Thread. - Blocking of the main thread always results in a blocking thread detection. Making a unit test to check the response data will fail.
- During a simple Blockhoud test, the simulated blocking calls outside the callback scope aren't going to be detected.
3. Implementation
To implement with WebFlux, we need to start with dependencies.
In this case we are going to use Maven:
We need of course the WebFlux library:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
Then we need the framework to run our application and get support for our MVC pattern:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
Having these dependencies in our maven repository configuration is what's important.
In our case we are using the maven compile plugin in order to compile our project with the JDK 14:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>14</source>
<target>14</target>
</configuration>
</plugin>
</plugins>
</build>
Let's take a top to bottom strategy.
First we implement our REST services.
This is all implement in our Controller class CatController
.
Our application will follow the typical MVC model, but instead of the traditional MVC way, we will implement it in a reactive way.
First we need a data model and for that let's have a look at our first kitten:
{
"id": 1,
"name": "Lania",
"color": "orange",
"species": "Katachtigen",
"pattern": "stripped",
"age": 4,
"formerOwners": [
1
],
"careCenters": [
1
]
}
What we have here, is the name "Lania" for this cat.
The way to interpret this data is:
- We have a cat named Mountain.
- The cats color is orange.
- The cat species is Katachtigen.
- The hair pattern is stripped.
- The cat is 4 years old
- The cat never had any owners.
{
"id": 2,
"name": "Mit",
"color": "black and white",
"species": "Katachtigen",
"pattern": "spotted",
"age": 9,
"formerOwners": [
2
],
"careCenters": [
1
]
}
And this is another cat called Mit. Mat is another cat who differs in that:
- Mat is 9 years old
- He is Black and White
- The pattern on his hair is spotted.
We also need the data of the care center for the cats.
This is one example.
This a list of centers.
In our case, there is only one element in it:
[
{
"id": 1,
"name": "Nieuwegein Kitten Center",
"address": "Kittenstraat",
"refNumber": "23ABC",
"city": "Nieuwegein",
"postCode": "9999CC",
"country": "Nederland"
}
]
Finally, we want the information of potential owners.
We make also an array of available owners:
[
{
"id": 1,
"name": "Eng. João Esperancinha",
"address": "Verrekijkers",
"rating": 10
},
{
"id": 2,
"name": "Eng. Erwin Zieligofski",
"address": "The swamp",
"rating": 0
}
]
Here we have two owners.
Top keep it simple, we are only interested in three properties for these owners.
These are their name
, adress
and rating
.
So, we can see here that the best owner is of course João Esperancinha
and the bad owner is Erwin Zieligovsky
.
Now that we have some more understanding of the data we are going to need, we can now have a look at the REST implementation.
This is an interface class to our REST controller:
@RestController
@RequestMapping("/cats")
public interface CatController {
@GetMapping
Flux<CatDto> getAllCats();
@GetMapping("/full")
Flux<CatDto> getFullAllCats();
@GetMapping("/{catId}")
Mono<CatDto> getCatByIdI(@PathVariable Long catId);
@GetMapping("/full/{catId}")
Mono<CatDto> getFullCatById(@PathVariable Long catId);
}
We will read data from our cats database via path /cats
.
Here we have the two publisher types available in the Spring Webflux library.
These are Mono
and Flux
.
We'll look into Mono and Flux in detail letter.
We are now only concerned in getting the architecture all together.
For our application, we just need to know that we want to get a complete list of available cats, a cat by id and these last two methods with complete data.
Let's now have a look at the REST service implementation interface:
@RestController
@RequestMapping("/cats")
public class CatControllerImpl implements CatController {
private final CatService catService;
public CatControllerImpl(CatService catService) {
this.catService = catService;
}
public Flux<CatDto> getAllCats() {
return this.catService.getAllCats();
}
public Flux<CatDto> getFullAllCats() {
return this.catService.getFullAllCats();
}
public Mono<CatDto> getCatByIdI(Long catId) {
return this.catService.getCatById(catId);
}
public Mono<CatDto> getFullCatById(Long catId) {
return this.catService.getFullCatById(catId);
}
}
The implementation of the RestController
doesn't differ that much from the implementation of a common MVC REST controller.
We can see that the difference lies in the fact that instead of returning something like a ResponseEntity
, we are instead return our publishers.
All of our REST controller methods are calling methods of the CatService
. The CatService
is where our business logic lies.
For the current application, the business logic isn't really that much and works more like a mapper between the DTO's and the data objects.
Our CatService
interface is very simple:
public interface CatService {
Mono<CatDto> getCatById(Long id);
Mono<CatDto> getFullCatById(Long id);
Flux<CatDto> getFullAllCats();
Flux<CatDto> getAllCats();
}
Its implementation is only a bit more complictated:
@Service
public class CatServiceImpl implements CatService {
private final CatRepository catRepository;
private final OwnerRepository ownerRepository;
private final CareCenterRepository careCenterRepository;
public CatServiceImpl(CatRepository catRepository, OwnerRepository ownerRepository,
CareCenterRepository careCenterRepository) {
this.catRepository = catRepository;
this.ownerRepository = ownerRepository;
this.careCenterRepository = careCenterRepository;
}
@Override
public Mono<CatDto> getCatById(Long id) {
return catRepository.getCatById(id).map(CatConverter::toDto);
}
@Override
public Mono<CatDto> getFullCatById(Long id) {
return catRepository.getCatById(id).map(cat -> {
CatDto catDto = CatConverter.toDto(cat);
return Mono.zip(ownerRepository.getOwnersByIds(cat.getFormerOwners())
.map(owners -> {
owners.forEach(owner -> catDto.getFormerOwners().add(OwnerConverter.toDto(owner)));
return owners;
})
.subscribeOn(Schedulers.parallel()), careCenterRepository.getCareCentersByIds(cat.getCareCenters())
.map(careCenters -> {
careCenters
.forEach(careCenter -> catDto.getCareCenters().add(CareCenterConverter.toDto(careCenter)));
return careCenters;
})
.subscribeOn(Schedulers.parallel()), (owners, cares) -> catDto)
.subscribeOn(Schedulers.parallel());
}).flatMap(Mono::from).subscribeOn(Schedulers.parallel());
}
@Override
public Flux<CatDto> getFullAllCats() {
return Flux.merge(getFullCatById(1L), getFullCatById(2L));
}
@Override
public Flux<CatDto> getAllCats() {
return Flux.merge(getCatById(1L), getCatById(2L));
}
}
On this occasion, the complexity of our implementations has increased considerably.
The reason being is that our REST controller is dependent of a few separate database entities.
It is important to remind ourselves that we don't actually have a database.
The database is being purely emulated by the use of the file system with static data.
This is also the reason why some id's have been hardcoded.
Before looking at what the CatService
can give us. let's look at all the repositories we are going to use.
Cats will be managed via CareCenter
s and will have a sort of legal guardian registered as an Owner
.
For the care centers we first define our interface:
public interface CareCenterRepository {
Mono<CareCenter> getCareCenterById(Long id);
Mono<List<CareCenter>> getCareCentersByIds(List<Long> careCenters);
}
And then we have our implementation:
@Repository
public class CareCenterRepositoryImpl implements CareCenterRepository {
private final CareCenter[] careCenters;
public CareCenterRepositoryImpl(ObjectMapper objectMapper) throws IOException {
this.careCenters = objectMapper
.readValue(getClass().getResourceAsStream("/carecenters.json"), CareCenter[].class);
}
@Override
public Mono<CareCenter> getCareCenterById(Long id) {
return Mono.fromCallable(
() -> Arrays.stream(careCenters).filter(careCenter -> careCenter.getId().equals(id)).findFirst()
.orElse(null));
}
@Override
public Mono<List<CareCenter>> getCareCentersByIds(List<Long> careCenteIds) {
return Mono.fromCallable(
() -> Arrays.stream(careCenters).filter(careCenter -> careCenteIds.contains(careCenter.getId()))
.collect(Collectors.toList()));
}
}
We create our repository just like all the other repositories.
On a first step we initialize our cached data by reading the data files in resources
.
Then we are left with two method: getCareCenterById
and getCareCentersByIds
.
We have mentioned before that Flux
stands for lists.
Our getCareCentersByIds
doesn't return a Flux
though.
Instead, it returns a Mono
of a List
of care centers.
This is important to understand.
We could have had returned it this way:
@Override
public Flux<CareCenter> getCareCentersByIds(List<Long> careCenteIds) {
return Flux.fromStream(
() -> Arrays.stream(careCenters).filter(careCenter -> careCenteIds.contains(careCenter.getId())));
}
This way we would have gotten a Flux
of CareCenter
's.
We will have this more clear further down this article.
For the moment, we just need to know that a Flux
works pretty much as a stream, which, dependeding on what to do, can make some zip
operations much more complicated than they should be.
Furthermore, it does not add any performance value some of these cases and in others, the zip operation is just impossible.
Let's move on to our Repository
interface:
public interface OwnerRepository {
Mono<Owner> getOwnerById(Long id);
Mono<List<Owner>> getOwnersByIds(List<Long> formerOwners);
}
For the same reasons we said for the CareCenterRepository
, we are using a Mono
of a List
here instead of a Flux
.
This is its implementation:
@Repository
public class OwnerRepositoryImpl implements OwnerRepository {
private final Owner[] owners;
public OwnerRepositoryImpl(ObjectMapper objectMapper) throws IOException {
this.owners = objectMapper.readValue(getClass().getResourceAsStream("/owners.json"), Owner[].class);
}
@Override
public Mono<Owner> getOwnerById(Long id) {
return Mono.fromCallable(
() -> Arrays.stream(owners).filter(owner -> owner.getId().equals(id)).findFirst().orElse(null));
}
@Override
public Mono<List<Owner>> getOwnersByIds(List<Long> formerOwners) {
return Mono.fromCallable(() -> Arrays.stream(owners).filter(owner -> formerOwners.contains(owner.getId())).collect(
Collectors.toList()));
}
}
Finally we need top just have a quick look at the CatRepository
:
public interface CatRepository {
Mono<Cat> getCatById(Long id);
}
And its implementation:
@Repository
public class CatRepositoryImpl implements CatRepository {
private final Cat cat1;
private final Cat cat2;
public CatRepositoryImpl(ObjectMapper objectMapper) throws IOException {
this.cat1 = objectMapper.readValue(getClass().getResourceAsStream("/cat1.json"), Cat.class);
this.cat2 = objectMapper.readValue(getClass().getResourceAsStream("/cat2.json"), Cat.class);
}
@Override
public Mono<Cat> getCatById(Long id) {
return Mono.fromCallable(() -> {
if (id.intValue() == 1L) {
return cat1;
}
if (id.intValue() == 2L) {
return cat2;
}
return null;
});
}
}
This is an extremely hardcoded repository, which is done so, just to support our tests and experiments.
Our repository implementation provides only one method: getCatById
.
With this method we can get the cats we want by id.
Looking back at the CatServiceImpl
we now know enough to start analysing this class.
Our service provides methods that are important to unerstand WebFlux
.
We can start by checking method getCatById
:
@Override
public Mono<CatDto> getCatById(Long id) {
return catRepository.getCatById(id).map(CatConverter::toDto);
}
What we are doing in this method is just making sure that we separate the back-end layer from the layer facing the front-end.
Just as in tradition MVC architectures, we separate our DTO (Data Transfer Object) from the persistence layer.
In our specific case, the conversion seems poinless, but this is because we are not transforming the data that much.
Normally, for more complicated cases, the data streamed to the public domain is usually either filtered, enriched, or in any other way changed.
For this case, we can clearly see that we are getting a Cat.
At least we will be getting a Cat
in the front end.
However, this isn't exactly the case.
Although we eventually get a Cat
in the front end, our Cat
is going to be published to the front-end.
We'll see this in further detail along this article.
For now let's have a better look at what's happening in the back-end in our repoitory layer:
@Override
public Mono<Cat> getCatById(Long id) {
return Mono.fromCallable(() -> {
if (id.intValue() == 1L) {
return cat1;
}
if (id.intValue() == 2L) {
return cat2;
}
return null;
});
}
We are returning a Mono
of a cat and not the cat iself.
What this means is that, we are returning an implementation of a publishing function in the callback of the REST call.
By doing this, we are ensuring that, this hardcoded if
, else
statements are only executed after the callback.
We are now going to have a look at a zip
method provided by Flux
:
@Override
public Mono<CatDto> getFullCatById(Long id) {
return catRepository.getCatById(id).map(cat -> {
CatDto catDto = CatConverter.toDto(cat);
return Mono.zip(ownerRepository.getOwnersByIds(cat.getFormerOwners())
.map(owners -> {
owners.forEach(owner -> catDto.getFormerOwners().add(OwnerConverter.toDto(owner)));
return owners;
})
.subscribeOn(Schedulers.parallel()), careCenterRepository.getCareCentersByIds(cat.getCareCenters())
.map(careCenters -> {
careCenters
.forEach(careCenter -> catDto.getCareCenters().add(CareCenterConverter.toDto(careCenter)));
return careCenters;
})
.subscribeOn(Schedulers.parallel()), (owners, cares) -> catDto)
.subscribeOn(Schedulers.parallel());
}).flatMap(Mono::from).subscribeOn(Schedulers.parallel());
}
This is method getFullCatById
. We are getting a full cat!
A full cat in this code means that we'll get a cat by doing three essential requests in a reactive way:
- Collect the
Cat
data - Collect the
Owner
data - Collect the
CatCenter
data
Points 2. and 3. need to be run in a parallel way.
However both of them depend on the Cat
data which we should get first.
This is how we want to do this:
The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.
Both web frameworks mirror the names of their source modules (spring-webmvc and spring-webflux) and co-exist side by side in the Spring Framework. Each module is optional. Applications can use one or the other module or, in some cases, both — for example, Spring MVC controllers with the reactive WebClient.
The zip
method from Mono
, allows to start two publishing callbacks separately in parallel.
For this to happen we can use many types in many different cases.
For our case we are going to zip two Mono
's together.
The first step is to get our Cat
out of the database.
Using instruction catRepository.getCatById(id).map(cat -> {
we get our cat from the database.
Recalling from the JSON data we are using for our database, we know that out cats only have references to their owners and care centers.
This is exactly what in a real life ER model database happens.
In our example, for some reason, this data is allocated in different systems and we cannot benefit from things like @ManyToOne
, @ManyToMany
or @OneToMany
.
These decorators are only available via Hibernate and they work very well in single databases, but we cannot used them if we consider a multi tiered system.
Now that we have our Cat
, we can now benefit from the zip
method.
For this we call return Mono.zip(ownerRepository.getOwnersByIds(cat.getFormerOwners())
.
Now we have to add two Mono
s as parameters.
These two Mono
's plus the Mono
where they are being called from, form a composed Mono
, which can be handle a a single Mono
publisher as a returned instance.
Our first Mono
is this:
ownerRepository.getOwnersByIds(cat.getFormerOwners())
.map(owners -> {
owners.forEach(owner -> catDto.getFormerOwners().add(OwnerConverter.toDto(owner)));
return owners;
})
.subscribeOn(Schedulers.parallel())
In this method we are letting the running thread know that we are going to get all the Owner
's and create DTO's with them.
We are also letting it know that when Netty subscribes to them, it will subscribe on parallel Schedulers.
This is what thee Schedulers.parallel()
instruction is for.
We add all the Owner
's that we find to our catDto we want to return.
The second Mono
follows that same pattern and we use it to get our CareCenter
's.
careCenterRepository.getCareCentersByIds(cat.getCareCenters())
.map(careCenters -> {
careCenters
.forEach(careCenter -> catDto.getCareCenters().add(CareCenterConverter.toDto(careCenter)));
return careCenters;
})
.subscribeOn(Schedulers.parallel())
We add all the CareCenter
's that we find to our catDto we want to return.
Finally we instruct the result Mono to also run in parallel:
.subscribeOn(Schedulers.parallel()), (owners, cares) -> catDto).subscribeOn(Schedulers.parallel());
Finally, maybe you have noticed it or maybe not, but we would be actually returning a Mono<Mono<CatDto>
publisher instead of what we want.
This is because of our zip
call that is running within yet another Mono
.
Webflux has a very interesting way of dealing with this and that is running the:
}).flatMap(Mono::from).subscribeOn(Schedulers.parallel());
By using flatMap
, we are making sure that our Mono<Mono<CatDto>
is optimized and converted to a Mono<Mono<CatDto
instance.
At the end of our CatServiceImpl
we still have the following two methods:
@Override
public Flux<CatDto> getFullAllCats() {
return Flux.merge(getFullCatById(1L), getFullCatById(2L));
}
@Override
public Flux<CatDto> getAllCats() {
return Flux.merge(getCatById(1L), getCatById(2L));
}
This is an illustration of a possible use of the method merge
of the Flux
publisher.
This method receives an array in the form of a varargs
argument.
This way, we are allowed to pass several Mono
's to this method.
The result is a Flux
.
On this occasion we definitelly make use of the Flux
in its most common form which is as a stream of different objects.
4. REST endpoints test
Let's have a quick run over what we have studyied so far.
4.1 Returning a simple Mono
We have seen this in method getCatById
of the CatServiceImpl
:
curl http://localhost:8080/cats/1
{
"name": "Lania",
"color": "orange",
"species": "Katachtigen",
"pattern": "stripped",
"age": 4,
"formerOwners": [],
"careCenters": []
}
Let's try for another cat:
curl http://localhost:8080/cats/1
{
"name": "Mit",
"color": "black and white",
"species": "Katachtigen",
"pattern": "spotted",
"age": 9,
"formerOwners": [],
"careCenters": []
}
4.2 Returning the complete cat info
Let's now see what our full method, getFullCatById
returns for Cat
1:
curl http://localhost:8080/cats/full/1
{
"name": "Lania",
"color": "orange",
"species": "Katachtigen",
"pattern": "stripped",
"age": 4,
"formerOwners": [
{
"name": "Eng. João Esperancinha",
"addres": "Verrekijkers"
}
],
"careCenters": [
{
"name": "Nieuwegein Kitten Center",
"address": "Kittenstraat",
"refNumber": "23ABC",
"city": "Nieuwegein",
"postCode": "9999CC",
"country": "Nederland"
}
]
}
And now for cat 2:
curl http://localhost:8080/cats/full/2
{
"name": "Mit",
"color": "black and white",
"species": "Katachtigen",
"pattern": "spotted",
"age": 9,
"formerOwners": [
{
"name": "Eng. Erwin Zieligofski",
"addres": "The swamp"
}
],
"careCenters": [
{
"name": "Nieuwegein Kitten Center",
"address": "Kittenstraat",
"refNumber": "23ABC",
"city": "Nieuwegein",
"postCode": "9999CC",
"country": "Nederland"
}
]
}
4.3 All cats
First we look at the simple way of getting the Cat
's info using method getAllCats
:
curl http://localhost:8080/cats
[
{
"name": "Lania",
"color": "orange",
"species": "Katachtigen",
"pattern": "stripped",
"age": 4,
"formerOwners": [],
"careCenters": []
},
{
"name": "Mit",
"color": "black and white",
"species": "Katachtigen",
"pattern": "spotted",
"age": 9,
"formerOwners": [],
"careCenters": []
}
]
We get no info about the owners, nor do we get info about the care centers.
Now let's see what happend with getFullAllCats
:
curl http://localhost:8080/cats/full
[
{
"name": "Mit",
"color": "black and white",
"species": "Katachtigen",
"pattern": "spotted",
"age": 9,
"formerOwners": [
{
"name": "Eng. Erwin Zieligofski",
"addres": "The swamp"
}
],
"careCenters": [
{
"name": "Nieuwegein Kitten Center",
"address": "Kittenstraat",
"refNumber": "23ABC",
"city": "Nieuwegein",
"postCode": "9999CC",
"country": "Nederland"
}
]
},
{
"name": "Lania",
"color": "orange",
"species": "Katachtigen",
"pattern": "stripped",
"age": 4,
"formerOwners": [
{
"name": "Eng. João Esperancinha",
"addres": "Verrekijkers"
}
],
"careCenters": [
{
"name": "Nieuwegein Kitten Center",
"address": "Kittenstraat",
"refNumber": "23ABC",
"city": "Nieuwegein",
"postCode": "9999CC",
"country": "Nederland"
}
]
}
]
5. A forged identity case
Let's picture this case in the form of a story:
Erwin Zieligovsky was a team leader in a production center.
One of the things that Erwin hated the most was being called Edwin.
Do you see the difference? It's small d
.
This d
would frequently enrage Erwin, up to the point that with the powers given to him, he would be responsible for people getting fired.
All of this because of that "offensive d
instead of the "heavenly" r
.
The Erwin was also known for having fits and rage attacks, especially around cats.
Erwin hated cats.
The curious thing though is that Erwin loved to hate cats and loved to show superiority even to cats.
The good thing though, is that the Cat care houses knew about Erwin and so he was always denied access when applying to adopt a cat.
One day, when renewing his identity card, he realized that the government had made a mistake.
Erwin was now called Edwin!
It was in his identiy card!
After fits of anger, Erwin thought about one thing.
With the new identity card, he would get another chance of adopting a kitten!
Erwin knew about the Cat care centers and how their software wasn't safe.
Let's see how Erwin tried to do this.
In the OwnerServiceImpl
class, there is a new method called checkLiabiliy
:
@Override
public Mono<String> checkLiability(String name) {
return ownerRepository.checkLiability(name);
}
In the repository in class OwnerRepositoryImpl
, it becomes more complex:
@Override
public Mono<String> checkLiability(String address) {
return Flux
.fromStream(() -> Arrays.stream(owners).filter(owner -> owner.getAddress().equalsIgnoreCase(address)))
.reduce((owner, owner2) -> {
if (owner.getRating() > owner2.getRating()) {
return owner2;
}
return owner;
}).map(owner -> owner.getRating() <= 0 ?
"NOK" :
"OK");
}
So what we are doing her is to check if the owner is liable to have a cat.
Since Owner
's are registered forever in the system, we can have a look if the addresses match.
The way it works is in that we first find if there are any owners in that address.
Then if there are, we check if their rating has gone lower than 0.
If so, then the owner get's a "NOK". If not, the owner gets an "OK".
We can test this also via unit tests:
class OwnerRepositoryImplTest {
private final OwnerRepository ownerRepository = new OwnerRepositoryImpl(new ObjectMapper());
OwnerRepositoryImplTest() throws IOException {
}
@Test
void givenLiable_checkLiability_Ok() {
Mono<String> verrekijkers = ownerRepository.checkLiability("Verrekijkers");
final String block = verrekijkers.block();
assertThat(block).isEqualTo("OK");
}
@Test
void givenNonLiable_checkLiability_NOk() {
Mono<String> the_swamp = ownerRepository.checkLiability("The Swamp");
final String block = the_swamp.block();
assertThat(block).isEqualTo("NOK");
}
}
The point of this last bit is to see how we can use lambdas to implement publishers with extra functionalities just like Java lambdas already do.
And so contrary to Erwin's expectations, the new identity card having Edwin in it, didn't save the day.
So now, poor Erwin, will have to renew his card and still live frustrated because once again he was denied the right to adopt a cat.
6. Testing with Blockhound
Blockhoud is a library developed quite recently and is available in GitHub
The developments on GitHub for this library started at the end of 2018.
Blockound uses markers placed on threads to determine if they are blocking or not:
BlockHound will transparently instrument the JVM classes and intercept blocking calls (e.g. IO) if they are performed from threads marked as "non-blocking operations only" (ie. threads implementing Reactor's NonBlocking marker interface, like those started by Schedulers.parallel()). If and when this happens (but remember, this should never happen!😜), an error will be thrown.
First step to do is to add the necessary libraries to our project.
First we need our test framework for Spring Boot:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>2.2.6.RELEASE</version>
</dependency>
Given that we are running JUnit tests, we need the JUnit support:
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound-junit-platform</artifactId>
<version>1.0.2.RELEASE</version>
<scope>test</scope>
</dependency>
Finally we need the Blockhound core:
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<version>1.0.2.RELEASE</version>
<scope>test</scope>
</dependency>
This way we can test all our calls in class CatControllerImplTest
:
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ExtendWith(SpringExtension.class)
class CatControllerImplTest {
final RestTemplate restTemplate = new RestTemplate();
@Autowired
private CatControllerImpl catController;
@LocalServerPort
private int port;
static {
BlockHound.install();
}
@Test
void getCatByIdI_whenCall_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getCatByIdI(1L))
.block();
}
@Test
void getFullCatById_whenCall_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getFullCatById(1L))
.block();
}
@Test
void getAllCats_whenCall_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getAllCats())
.block();
}
@Test
void getFullAllCats_whenCall_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getFullAllCats())
.block();
}
(...)
We need to pay special attention to this section of the code:
static {
BlockHound.install();
}
This is where Blockhound gets installed in order to be able to run the unit tests.
We can find multiple ways opf configuring this.
For this article we only need Blockhound.install()
.
Please bear in mind that BlockHound is Java Agent installed on a JVM level.
BlockHound is a Java Agent.
It instruments the pre-defined set of blocking methods (see customization) in the JVM and adds a special check before calling the callback (see Blocking call decision).
All of these @Test
's follow the same test procedure.
Taking getFullAllCats_whenCall_nonBlocking
as an example:
@Test
void getFullAllCats_whenCall_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getFullAllCats())
.block();
}
The point of Duration.ofMillis(1)
is just a standard way of configuring Blockhound.
There doesn't seem to be any difference in the unit test runtime, if we change the ofMillis
parameter.
What does makes a difference is the call to block()
.
Once we do this, we will be triggering the test to see if our method is blocking or not.
This same procedure applies to our OwnerControllerTest
:
@Test
void checkLiability_whenCallPositive_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> ownerController.checkLiability("Verrekijkers"))
.block();
}
@Test
void checkLiability_whenCallNegative_nonBlocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> ownerController.checkLiability("The swamp"))
.block();
}
These two extra tests check if our checkLiability
calls are blocking or not.
We are almost at the end of our article, but before we continue to the conclusion we still need to see the difference.
At the moment it is probably not clear what is the difference in practical terms between making a blocking call and a reactive call.
If we make a unit test with the code described above, we will realize that Blockhound will still not detect a potential blocking method.
We need to test this somehow.
In our example, we are using the file system as our database.
Unfortunatelly for our experiments we run into two problems here:
- Accessing the file system doesn't necessarily marks the thread as blocking.
- Using Flux or Mono, doesn't automatically make the method non-blocking.
- Using
wait
orThread.sleep
automatically marks the method as blockign regardless of running with WebFlux or not.
However, we do know that Blockhound will not generate errors if we just run the detection procedure and our simulation calls are implemented in the publisher instead of the main callback.
For this, I have created two extra methods in the CatControllerImpl
class:
public Flux<CatDto> getFullAllCatsReactiveForTest() {
return this.catService.getFullAllCats().map(catDto -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return catDto;
});
}
public List<CatDto> getFullAllCatsNonReactive() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return this.catService.getFullAllCatsNonReactive();
}
The first method, getFullAllCatsReactiveForTest
, is a reactive method.
We don't make the unit tests to test its result.
Instead we are only going to use Blockhound to see if our method is blocking or not.
The second method, getFullAllCatsNonReactive
, is a blocking method.
The Thread.sleep
, occurs during the callback scope.
This means that Netty
will receive a plain request response, only after waiting for 1 millisecond.
To make it more clear let's look at the unit tests that will help us understand this in class CatControllerImplTest
:
@Test
void getFullAllCatsNonReaciveForTest_whenCall_Blocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getFullAllCatsReactiveForTest())
.block();
}
@Test
void getFullAllCatsNonReacive_whenCall_Blocking() {
Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getFullAllCatsNonReactive())
.block();
}
Now we have the second unit test failling.
This is the Error
we should be getting at the moment:
reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep
at reactor.core.Exceptions.propagate(Exceptions.java:393)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
at reactor.core.publisher.Mono.block(Mono.java:1666)
at org.jesperancinha.housing.controller.CatControllerImplTest.getFullAllCatsNonReacive_whenCall_Blocking(CatControllerImplTest.java:77)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:675)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:125)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:132)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:124)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:74)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:104)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:62)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:43)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:35)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:202)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:198)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1510)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1510)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:229)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:197)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:211)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:191)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
... 65 more
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep
at java.base/java.lang.Thread.sleep(Thread.java)
at org.jesperancinha.housing.controller.CatControllerImpl.getFullAllCatsNonReactive(CatControllerImpl.java:51)
at org.jesperancinha.housing.controller.CatControllerImplTest.lambda$getFullAllCatsNonReacive_whenCall_Blocking$5(CatControllerImplTest.java:76)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:177)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
The way to fix this unit test is just to cause this test to expect this Error
:
@Test
void getFullAllCatsNonReacive_whenCall_Blocking() {
assertThatExceptionOfType(BlockingOperationError.class).isThrownBy(() -> Mono.delay(Duration.ofMillis(1))
.doOnNext(it -> catController.getFullAllCatsNonReactive())
.block());
}
This was our last failing unit test, and this concludes our research into Reactive Programming with WebFlux and testing it with Blockhound
7. Conclusion
This article was written with a lot of positivity to highlight the basic ways in which to use Reactive programming with WebFlux.
All code for this example is available on my repo on GitHub Kitten House Care
I hope that you enjoyed this article as much as I enjoyed writing it.
If you enjoyed it, please share it in the links below.
Take care, stay interested, stay logic, stay safe!
If you are interested in Reactive programming, you'll probably find the following slides very useful:
Reactive Programming With S... by João Esperancinha
or you may find the same presentation on slide-share:
Reactive programming with Spring Webflux.pptx from João Esperancinha8. Resources
8.1. Books and papers
- Clojure Reactive Programming by Leonardo Borges
- Dataflow and Reactive Programming Systems
- First Version of a Data Flow Procedure Language
- Daisy Bell (Bicycle Built for Two)"—Max Mathews, John L. Kelly, Jr., and Carol Lochbaum (1961)
- Dennis Talk
8.2. Websites
- What is the Reactive Manifesto?
- Dataflow and Reactive Programming Systems
- Kitten House Care
- Full Stack Reactive with React and Spring WebFlux
- Project BlockHound
- Sea Shell Archiver Source Code
- Concert Demos Source Code
- Dennis Talk @ MIT
- Jack Dennis @ Wikipedia
- Reactive Manifesto
- Design Patterns - Observer
If you enjoyed this presentation, you probably also would like to learn more about reactive programming. I have created a tutorial about best practices on creating a reactive web application with Kotlin coroutines and you can find the 3 episode set over here:
Or you prefer to watch all of them in one go, they are also available in one single video form on DailyMotion for you:
Top comments (3)
Reactive streams have several issues, which limit their usability:
Overall reactive streams are quite complicated and inconvenient, despite significant resources invested into development and tuning of various implementations.
I don't disagree. The real benefit of reactive seems to be resilience. Having said that, even with coroutines, the level of complexity may outweigh the benefits in many cases. It is really a question of trying per case and see if it performs well. Benchmarking and performance tests are crucial to make correct decisions. The concepts of "cold" and "hot" streams may feel complicated as you mention and in many cases, they are. What reactive streams don't do is something that I often see defended and it is that "they are faster!". That's an odd concept. They may perform better or worse than the traditional blocking MVC model, but in any case, the resistance to backpressure should most of the times be higher with reactive streams. Even if it is just a little bit.
I think that there should be no complexity barrier and asynchronous processing should be as simple and convenient to use as synchronous code. This is achievable by using functional style Future/Promise API.