DEV Community

Cover image for Reactive Programming applied to Legacy Services — A WebFlux example
João Esperancinha
João Esperancinha

Posted on

Reactive Programming applied to Legacy Services — A WebFlux example

1. Introduction

The concept of reactive programming can be traced way back to the mid-60s. It’s a declarative programming model or programming paradigm that is mainly concerned with the handling of asynchronous data streams and the propagation of data through transformations with certain determined orders of execution. The term was invented by Erik Meijer probably around 2010. The history of reactive programming isn’t very clear as to where it started. However, we can see references to it when in old literature we find terms like “data flows”. These ended up being what we for a long time have been used to call and refer to as streams. Data flows as such were a part of another programming paradigm then called dataflow programming. This term was invented by Jack Dennis and his graduate students in the 1960s at MIT. Reactive programming is sometimes referred to as Dataflow programming on steroids. If you’d like to know more about the history of Reactive programming, I placed a lot of links in the resource repo of this article. Please have a look at them at the end of this article.

2. Reactive Programming in General

Reactive programming has many implementations, and they are all based on the observer pattern. In reactive programming terms, this is also known as the publisher-subscriber pattern. Before the publisher, there is a producer of events. This essentially is the bridge between the triggered event and the publisher. Once a request comes in, the producer triggers the event handler. The publisher is an implementation of that. It processes and emits data without any knowledge of the receiver. The receiver is the entity that knows that data is coming through via a specific channel. It knows the type of data coming in. The receiver is known as the subscriber of that data and it basically waits for the data to be delivered. A subscriber is subscribed to a publisher.

3. Case study

3.1. Notes

In this article, I’m explaining briefly what Reactive Programming looks like and what it is, before making a deep dive into the code. We will not need any major technological background to understand it. I am going to show an example, and we will try to see and identify the Reactive principles as we go along. In this article, I will go through problems found along with implementing reactive programming and the possible solutions. I will describe the pros and cons of each implementation. At the bottom of this tutorial, we will find the most plausible solution I found for the case I’m presenting.

3.2. Introduction

In many current running services across the globe, many companies are still using services from quite a long time ago. Some systems even still use the 8-inch (ca. 20 cm), 80Kb disks. For these systems, the software is very outdated, very old, and the implications of changing it grossly outweigh the risks. This usually happens mostly due to economic and workload constraints. Applications for the military, airports, air companies, and banks sometimes are, more often than not, very much outdated. We have seen news of Airports still using Windows 3.11 and banks using COBOL. This is also a reason why software manufacturers are so busy with application transformations and the spread of the 12-factor application philosophy. They are offering other solutions to bring systems up to date to current realities and challenges. Many times this just cannot be done in one step, no matter how inspired we may feel about it.
Let’s have a look at bank examples or even telecommunication companies. In my experience with Telecom and Banks, they many times do still work with SOAP services, WSDL service definitions, XSD’s, XLTS’s, and so forth. These are mostly inherited from systems like Siebel and SAP. From a technological perspective SOAP really only has the downside of giving a lot of overhead and package load. For the rest everything is debatable. But this isn’t the problem I want to zoom in. If these SOAP services were efficient, resilient, scalable, responsive, and fast enough, then discussions about them would really just boil down to aesthetics rather than anything else. Truth is that many times their design doesn’t really allow much flexibility and working around them can seriously complicate our development. Not only that, frequently SOAP services are designed with many dependencies in mind, many interdependent keys, different ports and methods, different types with the same name, and many other intricacies also seen in typical REST JSON-based services.
In order to avoid any personal relation to any company I’ve worked within in the past, I have decided to make up an unrelated example. Let’s say we have a population where people live in different Shells. We call this our root element. Every shell has a slogan. In my example, there are 16 slogans, and they are all sentences from Cardi B’s rap solo in Maroon 5’s Girls Like You. Shells will have people and will gather costumes. For people, I picked up 16 different characters from Game of Thrones. Each costume has a top and a lower and that refers to the top clothing like a T-shirt and the lower clothing like some trousers. Each person will have a costume and an account with a value. Take note that I am not concerned in this example with validation, repeated data, or circular data. We are going to investigate the complexity of making sub-requests of a request in a Reactive Programming web application.
This is a summarized overview of what we are going to look at:

blogcenter

In the above picture, we can see that there is an event loop that handles the events. The events are produced by the REST controller. Afterward, they are handled by the event handler of the publisher. All our SOAP calls and requests are made here. At this point, we have left the servlet thread.
One way to see this is that in our built-in application and at this point, we won’t be able to break any execution with any breakpoint outside the programmed consumers or any of the handlers. One way to look at this is by understanding that in a Reactive MVC model, our model is not returning data. Instead, it returns a publisher that later responds back to the subscriber in its own thread and then lets the subscriber reply back to the browser.
The event has just been triggered. At that point, our REST controller has already built a publisher (Flux or Mono) and our service is now executing it. Netty is implementing our subscriber to execute the callback to the client via HTTP. We can only have one subscriber per publisher, but we can have many consumers that can be triggered in independent threads via a doOnNext method consumer implementation for example. We can find many other alternatives to doOnNext in the Web-Flux API. A publisher allows us to apply transformations in a similar way that lambdas do. We also have map and flatMap methods for example. This pipeline represents a series of declarative code transformations. We can apply it sequentially to our data. One of the very important things to remember is that nothing really happens until we have a subscriber. This is a very analogous thought to the idea of terminal operations in Java Lambdas. In our example application, we won’t find any explicit subscribers in our code. This is because Netty is our subscriber and its code resides underwater. If you are interested in seeing the code more deeply please investigate three important classes:

  • reactor.core.publisher.FluxMapFuseable
  • org.springframework.http.server.reactive.ChannelSendOperator
  • reactor.netty.http.server.HttpTrafficHandler You’ll be able to create a breakpoint in between them and then get a better sense of what’s happening when executing the callback to the browser. In the following example, we can see that the method findLowerById returns a Mono. This is not a value. Instead, it is a publisher that Web-Flux understands and generates a specific independent thread for it to run.
@Repository
public class ShellLowerRepositoryImpl implements ShellLowerRepository {

    private final LowersClient lowersClient;

    public ShellLowerRepositoryImpl(LowersClient lowersClient) {
        this.lowersClient = lowersClient;
    }

    public Mono<Lower> findLowerById(final Long id) {
        return Mono.fromCallable(() -> lowersClient.getLower(id))
                .subscribeOn(single());
    }

    public Lower findLowerByIdBlock(Long lowerId) {
        return lowersClient.getLower(lowerId);
    }
}
Enter fullscreen mode Exit fullscreen mode

This concludes my introduction. We will now dig into the code I’ve done and review a few major examples:

  • Data Model
  • Architecture Overview
  • Making publishers
  • Services
  • In practice
  • A reactive solution
  • An immutable reactive solution

Bear in mind that this code is specifically made for tutorial purposes and given its complexity I will only explain the relevant parts to this tutorial.
In this article, I’m not showing complete class implementations because of the fair complexity and size of the whole code. The code is in any case, available to see in more detail on GitHub.

3.3 Data model

In order to understand which data we are trying to get, let’s first have a look at our data model:

blogcenter

3.4. Making publishers

Creating publishers rely heavily on declarative programming techniques. Essentially what we are going to do is the implementation of the Observable pattern in several places of our application. In Spring Web-Flux that can be very easily done using Flow and Mono. These are two forms of Publishers. So we are going to declare what do we want to happen with our data once we receive it before our subscriber, Netty, can pick that up.
I will guide us through the ShellRepositoryImpl class and another one in our repository layer. These should cover the concepts of using legacy code in a reactive way.
All the publishers in our example are being made with code very similar to this:

public Mono<Shell> findSeaShellById(final Long id) {
    return Mono.fromCallable(() -> shellsClient.getShell(id)).subscribeOn(elastic());
}

public ParallelFlux<Shell> findAllSeaShells() {
    return findAllShellIds()
            .parallel(parallelism)
            .runOn(elastic())
            .map(shellsClient::getShell)
            .runOn(single());
}

public Flux<Long> findAllShellIds() {
    return Mono.fromCallable(shellsClient::getAllShellIds)
            .flux().flatMap(Flux::fromIterable);
}
Enter fullscreen mode Exit fullscreen mode

Looking at the findAllSeaShells method, we easily can see that the goal of this method is to retrieve all the shells in the repo. In this code, we make use of the shellsClient client. This client communicates with one webservice specifically responsible for the retrieval of SeaShells data. The first method in the lambda chain is the findAllShellIds method. The main highlight of this code above is the Mono.fromCallable. This is the function to use if we intend to call a method from a black-box. We call it black-box, because we know that, further down the line, we retrieve all Ids from an unknown implementation. We just know that we get a list of Ids but we are not interested in how it is implemented. The latter is the job of our legacy code . Our black boxes are our SOAP clients. The SOAP implementation simulates our unknown services and the clients are the adapters we’ve created. In such situations, we won’t be able to completely predict what and how the client responds. From the API we can read precisely how it works. From here we can see that the biggest advantage is that we can program this Mono to return null if it fails: “If the Callable resolves to null, the resulting Mono completes empty.”. Mono publishers are used in representing single objects. Flux publishers are used for lists of objects. The fromCallable method is implemented in the Mono publisher only. It would probably seem better to use the Flux.fromIterable method. However, by allowing a conversion from Mono to Flux, we are letting the WebFlux framework optimize the later execution and conversion from a List object to individual objects to the subscriber. A direct Flux.fromIterable conversion would skip that optimization step. This conversion to Flux is explicitly implemented with a flatMap method. The functions we use are very similar to Java Lambdas. We create them in a declarative way. Essentially we are declaring the chain of events that is going to happen using lambdas. Now that the publisher knows it needs to get a list of IDs, we need to map them to actual shells. At this point, we still preserve a one-to-one relationship. In other words for each Id, we have a shell. Therefore we have it very clear that we need to wait for the Id in order to process our following request. By default, the processing is sequential. We need parallel processing because we want our application to be as reactive and responsive as possible. As a consequence, we need to explicitly define in Flux that we are going to get our shells in a parallel fashion. This is where parallel comes into play. It is configurable with a custom parameter called sea.shell.parallelism. Here we can configure the maximum parallel threads we want to create. For Flux this is still not enough. After this, we need to specify the type of thread management we want and for this, we have different types available. In my example, I am using the elastic type. This is ideal because we are dealing with legacy code which potentially can be very slow. At the same time, in my example, we can have unlimited threads generated per person in shell request. From the API we have this definition: “Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) can grow indefinitely”. Following this, we will get our Id. Notice that I am still talking in the future tense. We are still declaring code and nothing is happening. With our Id, we will be able to map that stream to another transformation. We will then get our actual Shell. This is our instance coming out of our soap client. Since we are running everything in parallel and it’s a small request can safely determine that our Scheduler is of a single type. From the definition, we have “Optimized for fast Runnable non-blocking executions”. Notice that because of all of the above our Flux is now of a ParallelFlux type.

All the other methods presented in this implementation are simple methods implemented in a very Naif way to get the data in a blocking way. Further in this article, we’ll have a look at differences in behavior.

3.5. Services Implementations

We’ll now look at different examples for our Services implementation. The idea here, to look at four different solutions, which then will be further subdivided into more sub-groups. Essentially we’ll be looking at a really bad solution and we’ll gradually improve to a better one. These implementations comprise the following implementation scenarios:

  • traditional MVC approach
  • mutable solution with loose running subscribers
  • wrapping a blocking call
  • awaiting for the loose subscribers
  • wrapping futures and fork-joins
  • one by one alternative
  • mutable solution with coordinated publishers
  • immutable solution with coordinated publishers

I have prepared three service types in the module sea-shell-rest-service. SeaShellServiceImpl, SeaShellReactiveOneServiceImpl and SeaShellReactiveServiceImpl. They are all bad, but I think it’s important to delve into this because we’ll see different behaviors we want to avoid. All of them are different implementations of examples we could use to handle our data. The first implements a series of methods using a naif approach to getting our data all at once using futures and fork joins..The second is an example of how we could use a microservice approach, by making the client complete the chain. Finally, the last one is a complete reactive implementation using nothing but chained publishers. These are fundamental examples that will help us understand how Reactive Programming works. We will also gain a clear idea of the problems that we may face in seemingly “obvious and easy” solutions. The last is the example which in the end makes more sense to me to use. It resides in a separate module called sea-shell-service-immutable. I will also explain the reasons why I came to that conclusion.
Let’s start by having a look SeaShellServiceImpl:

@Value("${sea.shell.parallelism:20}")
private Integer parallelism;

@Value("${sea.shell.delay.ms:100}")
private Integer delay;
Enter fullscreen mode Exit fullscreen mode

As we can see we have two inject properties: parallelism and delay. We already know what parallelism is. In regards to delay, this comes from a property called sea.shell.delay.ms in the application.properties file. We will see in detail what this property is about, but for the time being let’s look at all of the presented solutions. In this class, I’m using the naif word to represent the blocking methods. However, we can look at all of them as naif implementations. Let’s have a look at the methods implementations of this service. I will then guide us through the practical example with the running application. We’ll have a look at what we can learn from these examples.

3.5.1. Example 1— Using Web Mvc (traditional MVC approach) — module sea-shell-rest-service

The blocking MVC approach to this problem doesn’t use any publishers in its implementation. In this method, we’ll basically perform all necessary queries by asking and waiting for the response. This uses the traditional Spring MVC architecture and its purpose in this exercise is to show us how this approach differs from a Reactive Programming approach. Note that I could have used here ForkJoin’s, CompletableFuture’s, Future’s, Executor’s, etc, to trigger parallelism. This is a more complicated option, which doesn’t add anything to a reactive programming way of building applications. It would still force us to wait for all sub-threads to finish before letting go of the main thread. It is represented by methods getAllSeaShellsNaifBlock, getSeaShellNaifBlock and setMainRootElements. These are all imperative programming examples:

public List<SeaShellDto> getAllSeaShellsNaifBlock() {
    return shellRepository.findAllSeaShellsBlock()
            .parallelStream()
            .map(SeaShellConverter::toShellDto)
            .peek(this::setMainRootElements)
            .collect(toList());
}

public SeaShellDto getSeaShellNaifBlock(Long id) {
    final SeaShellDto seaShellDto = SeaShellConverter
            .toShellDto(shellRepository
                    .findSeaShellBlockById(id));
    setMainRootElements(seaShellDto);
    return seaShellDto;
}
Enter fullscreen mode Exit fullscreen mode

However further in this article, I will discuss precisely the ForkJoin method and why it’s very unlikely that we would be able to make the method non-blocking in this way.

3.5.2. Example 2— Using consumers (mutable solution with loose running subscribers) — module sea-shell-rest-service

Let’s look at getShellById in SeaShellServiceImpl:

public Mono<SeaShellDto> getSeaShellById(Long id) {
        return shellRepository.findSeaShellById(id)
                .map(SeaShellConverter::toShellDto)
                .doOnNext(consumerPersons())
                .doOnNext(consumerCostumes());
}
Enter fullscreen mode Exit fullscreen mode

It gets a shell, creates its matching Dto and then tries to change the Dto further using a chain of consumers. If we look inside any of the consumer methods we’ll see an implementation of a subscriber. This subscriber is the last destination of the data being sent through. Notice also that the data type doesn’t change through the doOnNext methods. If we look in detail into the implementation we see that I’m filling up all the chained node tree of a Shell with its Person’s, Costume’s, Account’s, Top’s and Lower’s. Essentially I’m trying to get a complete Shell across and returning a ShellDto. Unfortunately, although the idea behind it seems good, it goes against Reactive Programming principles and it does not guarantee that the objects are returned completely filled with data anyways. It goes against Reactive Programming principles because we are thinking about waiting for the Person and Costumes and all its underlying needed methods to make all their calls and fetch results. This may or may not happen depending on the time the doOnNext is running and the time the Mono is running.

3.5.3. Example 3 — Almost reactive (wrapping a blocking call) — module sea-shell-rest-service

Looking at example 2, you may have had this idea: “Why not make the naif implementation multithreaded and then wrap up that method with a publisher?” Truth is, we could, and in reality, that would indeed be a non-blocking example. We would get all the data in one go, and we would not block the main thread. Unfortunately for this solution, Reactive Programming also means fast, scalable, and more importantly very, very responsive. With this approach, we are still making the user wait, although in a different way, for all the requests to be done. Please look at the solution implemented in getAllSeaShellsReactiveBlock:

public ParallelFlux<SeaShellDto> getAllSeaShellsReactiveBlock() {
    return Mono.fromCallable(this::getAllSeaShellsNaifBlock)
            .flux().flatMap(Flux::fromIterable)
            .parallel(parallelism)
            .runOn(elastic());
}
Enter fullscreen mode Exit fullscreen mode

In this example, we moved the problem out of the main thread that waits for the soap call to be done. However, we only re-located it to the single thread in the event loop. Blockhound will confirm that however, it has become a non-blocking method. However, in this way, we are letting the responsibility of dealing with this thread blocking system on ourselves. WebFlux is only responsible to essentially wrap this otherwise blocking method.

3.5.4. Example 4 — Awaiting reactive approach(awaiting for the loose subscribers) — module sea-shell-rest-service

Take a look at the differences between getAllSeaShellsReactiveWithDelay and getAllSeaShells. See the delayElements method? From the definition “duration by which to delay each Subscriber.onNext(T) signal”. This means that each one of the signals will be delayed by a certain about of time. By default, we have established 100ms in this delay. We will see that this is enough to get complete data. Furthermore, we will also see how it actually works. Remember that being an upgrade of example 1, this is still not our solution:

public Flux<SeaShellDto> getAllSeaShellsReactiveWithDelay() {
    return getAllSeaShells()
            .sequential()
            .delayElements(ofMillis(delay))
            .subscribeOn(elastic());
}
Enter fullscreen mode Exit fullscreen mode

Looking at the above code, we can see that basically, we are just guessing the time it will take for the getAllSeaShells() to complete. Further, we are always making sure that for each request, the request will wait for that specific time. In other words, we just made a non-blocking request that takes at least a specific time to return a response.

3.5.5. Example 5 — Wrapping and Fork Joining(wrapping futures and fork-joins) — module sea-shell-rest-service

Up until here, we have made no mention of multithreading. Now we are going to do just that. In this same class let’s have a look at method getSeaShellCostumesForkJoinTask:

public Flux<SeaShellDto> getAllSeaShellsReactiveWithForkJoins() {
    return Flux.fromStream(shellRepository.findAllSeaShellsBlock().parallelStream()
            .map(shell -> {
                final ForkJoinPool commonPool = new ForkJoinPool(100);
                final SeaShellDto seaShellDto = SeaShellConverter.toShellDto(shell);
                final Stream<ForkJoinTask<SeaShellPersonDto>> personDtoStream =
                        commonPool.invoke(getSeaShellPersonsForkJoinTask(commonPool, seaShellDto));
                final Stream<ForkJoinTask<SeaShellCostumeDto>>
                        costumeDtoStream = commonPool.invoke(getSeaShellCostumesForkJoinTask(commonPool, seaShellDto));
                seaShellDto.setPersons(personDtoStream.map(ForkJoinTask::join).collect(toList()));
                seaShellDto.setCostumes(costumeDtoStream.map(ForkJoinTask::join).collect(toList()));
                return seaShellDto;
            }));
}
Enter fullscreen mode Exit fullscreen mode

In this method, we are using a from stream method. Notice that there is a ForkJoinPool. If we run the unit tests and see how they are implemented:

@Test
public void findAllCompleteSeaShellsReactiveWithForkJoins_onCall_thenBlocking() {
    assertThrows(WebServiceException.class, () -> delay(Duration.ofSeconds(1))
            .doOnNext(it -> seaShellService.getAllSeaShellsReactiveWithForkJoins())
            .block());
}
Enter fullscreen mode Exit fullscreen mode

The exception we’re catching here is WebServiceException. This exception is thrown by blockound in the chain when it finds a blocking call. We are using a publisher, the publisher is being used and yet, Blockhound is still saying that we have a blocking method. The only difference between this method and all the other methods is that we are using a ForkJoinPool within the wrapper. In this case, it seems that this happens because of where the ForkJoin pool is created. When a thread joins the commonPool, it will block the main thread. It’s as simple as that.

3.5.6. Example 6 — A reactive client (one by one alternative) — module sea-shell-rest-service

One simple way of making everything reactive is to wrap up all your methods with publishers. Not much work is involved in this solution because we are only making sure that we pass the simple publishers we made in our repositories all the way to the controllers. Our services work almost as simple proxies. This means that they do have an extra change added. This is simply the conversion of the data to a Dto. Let’s take a look at an example in the SeaShellReactiveOneServiceImpl class:

public Mono<SeaShellDto> getSeaShellById(Long id) {
    return this.shellRepository.findSeaShellById(id)
            .map(SeaShellConverter::toShellDto);
}
Enter fullscreen mode Exit fullscreen mode

In this way, you can see that the only change is a map to a ShellDto. This is one of my elected solutions to consider when thinking about creating a reactive framework around legacy services.
We can find an example of our reactive client in the future package. This client is implemented in class SeaShellWebReactiveOneClient. There is an implementation with a subscriber. In the end, we wait for 1 second in order to let the subscriber finish its work:

private void consumeReactively() throws InterruptedException {
    getAllSeaShellsReactively().subscribe(x -> log.info("BLOCK->" + x.toString()));
    Thread.sleep(1000);
}
Enter fullscreen mode Exit fullscreen mode

3.5.7. Example 7 — A fully reactive approach(mutable solution with coordinated publishers) — module sea-shell-rest-service

In some cases, we may find it more convenient to make sure that we get a few important things:

  1. Performance
  2. All data at once
  3. Keep it non-blocking
  4. Keep it simple
  5. Achieve true parallelism

Achieving an improvement in all of these points at the same points is highly unlikely. If we get all the data at once, we will reduce performance. If we want to keep it simple, then chaining everything together may work against us. To keep it non-blocking, we may be moving a lot of performance to the single thread that deals with everything. The best way to answer this is: Experience will tell us everything we need to know about these decisions. If after weighing all pro’s and con’s of taking a fully reactive approach, we are still convinced, then let’s look at how this would work.
Let’s start with the example on class SeaShellReactiveServiceImpl:

public ParallelFlux<SeaShellDto> getAllSeaShells() {
    return shellRepository
            .findAllSeaShells()
            .map(this::fetchSeaShellPublisher).flatMap(Flux::from);
}
Enter fullscreen mode Exit fullscreen mode

The fetchSeaShellPublisher is a method that returns a publisher that will start our publishing process. Notice that we are mapping a flux of SeaShells coming from our SOAP services. When mapping from a Publisher to another publisher, we will need to flatMap everything. Let’s keep this in mind before we move on. It’s important to understand that we are still only declaring and nothing has actually happened in terms of processing.
Let’s have a look at fetchSeaShellPublisher in more detail:

private Mono<SeaShellDto> fetchSeaShellPublisher(Shell seaShell) {
    final SeaShellDto seaShellDtoReturn = SeaShellConverter.toShellDto(seaShell);

    return zip(
            fetchPersonsPublisher(seaShell, seaShellDtoReturn).subscribeOn(Schedulers.parallel()),
            fetchCostumesPublisher(seaShell, seaShellDtoReturn).subscribeOn(Schedulers.parallel()),
            (persons, costumes) -> seaShellDtoReturn);
}
Enter fullscreen mode Exit fullscreen mode

See that we are using the Mono.zip method. This is one of the many ways to chain publishers together. We are going to have a look at this one in particular. Later I may make a whole article about the improvement of this implementation. What’s important for now is to see the chaining of publishers in action. With the zip method we can chain publishers together and implement its result. A very nice feature of zip is that when we subscribe to the participating publishers on a parallel scheduler, they actually work in parallel. In our data model actually have a lot of pairs. In the root of the shell we have two elements. A list of persons and a list of costumes. At the root of our costumes, we have another pair. A top and a lower. In the root of person, yet another one. An account and a costume. Notice that costume in a person follows the same data model as a person.
Let’s look at one of these two methods to see how the chain works. This is fetchPersonsPublisher:

private Mono<?> fetchPersonsPublisher(Shell seaShell, SeaShellDto seaShellDtoReturn) {
    return from(from(just(
            seaShell.getPersons()).subscribeOn(parallel()).map(shellPersonRepository::findPersonsBlock).subscribeOn(parallel())
            .map(persons -> {
                persons.forEach(person -> seaShellDtoReturn.getPersons().add(SeaShellConverter.toShellPersonDto(person)));
                return seaShellDtoReturn.getPersons();
            }))
            .thenMany(zip(
                    fetchPersonAccountPublisher(seaShellDtoReturn).subscribeOn(parallel()),
                    fetchPersonFullCostumePublisher(seaShellDtoReturn).subscribeOn(parallel()))));
}
Enter fullscreen mode Exit fullscreen mode

Take a moment to look at the thenMany method. This method sets up the first element in our person's publisher chain. We know that in a persons element we should pick up an account and a costume at the same time. As we now are aware, this is where the zip method comes in to essentially zip those two publishers together and in parallel as one single publisher! We should be able to easily interpret the rest of the code with this explanation.
When all the chains are complete, we can make sure that our thread will run through multiple publishers in a non-blocking way. The returned publisher is a result of this chaining and it will be handled independently in one separate thread without interfering with the main thread. It will of course generate different threads and it will evidently generate parallelism. The difference is that no interference with the main thread will be made.

3.5.8. Example 8— A fully reactive approach(immutable solution with coordinated publishers) — module sea-shell-service-immutable

We finally reach our perfect solution. In the same way, as in the previous example, our goal is to make a fully reactive call. We already achieved that, however, we were not able yet to remove the side-effects of our code purely because we are taking advantage of mutability. With immutability, the whole code needs to change and the paradigm changes once again radically.
In this case, the getShellsById method takes a different form:

public Mono<SeaShellDto> getSeaShellById(Long id) {
  return shellImmutableRepository.findSeaShellById(id)
    .flatMap(shell ->
      zip(
        shellPersonImmutableRepository
                .findPersons(shell.getPersons().getPersonId())
                .flatMap(person -> personPublisher(person).subscribeOn(parallel()))
                .sequential().collectList().subscribeOn(parallel()),
        shellCostumeImmutableRepository
                .findCostumes(shell.getCostumes().getCostumeId())
                .flatMap(costume -> costumePublisher(costume).subscribeOn(parallel()))
                .sequential().collectList().subscribeOn(parallel()),
        (persons, costumes) ->
                SeaShellDto.builder()
                        .name(shell.getName())
                        .scientificName(shell.getScientificName())
                        .personIds(shell.getPersons().getPersonId())
                        .costumeIds(shell.getCostumes().getCostumeId())
                        .persons(persons)
                        .costumes(costumes)
                        .build()).subscribeOn(parallel())
    );
}
Enter fullscreen mode Exit fullscreen mode

The method is considerably large, but this is mostly because of the builder we are using. We first find a Shell. Then, with that Shell, we will find all persons and costumes. Each process will run in parallel and upon completion, the resulting two lists will be used to create a new Shell with the results of the previous Shell query. If you follow the way persons and costumes are created in this project, you will see the creation of different objects. With the mutable solution, we only changed the property. With this immutable solution, we need to create a whole new object every time we need an object with a different property. Using common Java POJOS’s we would also be able to create an example like this one, but it helps a lot to have a constraint against the usage of property changes. This is why I chose to use java records for this example, and so, our new Shell POJO takes a new form:

public record SeaShellDto(
        String name,
        String scientificName,
        String slogan,
        List<SeaShellPersonDto> persons,
        List<SeaShellCostumeDto> costumes,
        List<Long> personIds,
        List<Long> costumeIds) {

    @Builder
    public SeaShellDto {
    }
}
Enter fullscreen mode Exit fullscreen mode

3.6. Controllers

Let’s run our SOAP service and our SpringBoot Application to analyze this situation.

  • Run the soap service:
cd sea-shell-soap-wiremock/sea-shell-soap-service/target
java -jar sea-shell-soap-service-0.0.0-SNAPSHOT-jar-with-dependencies.jar
Enter fullscreen mode Exit fullscreen mode
  • Run our Spring Boot service:
cd sea-shell-service-spring-web-flux/sea-shell-rest-service/target
java -jar sea-shell-rest-service-0.0.0-SNAPSHOT.jar
Enter fullscreen mode Exit fullscreen mode

Let’s study on more detail our four controllers: SeaShellControllerImpl, SeaShellReactiveControllerImpl and SeaShellReactiveReactiveOneControllerImpl. If we take a look at them, there are quite a few methods implemented. We’ll have a look at the reactions of all of these calls to our service. For every example please take a moment to review the consumer's chapter if you think you’ve missed something. We can use curl for this or our browser. For the purpose of readability and to make a faster assessment of each call I think it’s better to use a browser. I used Chrome and the JSONView plugin for this. In some examples, I need to return objects of different types. Coincidently they are always pairs and that’s why I’m using the Spring Data Pair type to return these objects.
Example of ParallelFlux
In order to have a clean start, I decided to create an example 0 to show a couple of basic things that happen when making a request to the back end.
Let’s look at the getShellSlogans method. Make the following request:

http://localhost:8080/seashells/slogans
Enter fullscreen mode Exit fullscreen mode

In this method, I’m just mapping the results of a non-blocking call defined in method getAllSeaShells to its respective slogan. The reason I’m mapping them and not calling one method to get the slogan directly is so that we can see that, even though we have implemented it with several calls inside and several subscribers defined in the consumers, nothing changes in terms of performance. Now let’s check our logs. We should be able to see all the subtrees of the shells being called. In some cases the values are null, others empty and in other cases, the values are all there. In the following examples, I will explain this further. For the moment, The important part of this example is that our method is returning a ParallelFlux. In this case, the point is to see that we have achieved this and that every Pair we return is not following the actual order of Cardi B’s lyrics:

[
  {
    first: "Baltic tellin",
    second: "I need you right here 'cause every time you call"
  },
  {
    first: "Baltic tellin",
    second: "I play with this kitty like you play with your guitar"
  },
  ..
]
Enter fullscreen mode Exit fullscreen mode

We may get lucky and we may indeed get the whole lyrics right for the complete 16 slogan segment, but that is highly unlikely. We all know that Cardi B starts her rap with “Not too long ago, I was dancing for dollars (eeoow)”. What this shows is that ParallelFlux does not guarantee the order.

3.6.1. Example 1

In our blocking MVC example, we are always sure to return our data completely. It has been implemented in a traditional blocking fashion. Still, it is important at this point to have a look at what happens behind the scenes in such a blocking processing type:

blogcenter

Comparing this model with the reactive programming model we see an enormous difference in complexity. The blocking model seems more simple. However, what happens, in this case, is that the main Servlet thread for each request is completely blocked until we get a response back from our SOAP service. This will limit the ability of the service to cope in scenarios where high availability from the server is required.

3.6.2. Example 2

Let’s recall from the Services explanation that this example relies on the doOnNext consumer declarations to run. Let’s try calling this multiple times in our browser:

http://localhost:8080/seashells
Enter fullscreen mode Exit fullscreen mode

If we are lucky, we’ll be able to notice that sometimes we get all the data, but also that most o the time, much of the data is actually null! Let’s look at the following two sequence diagrams which explain what happens:

blogcenter

Above we can see a normal flow of data. We see that all subscribers are being called in parallel and that the doOnNext works perfectly and in sequence. By the time we get to our Netty subscriber, we are sure to get the seaShellDto object filled up with data.
In most cases we won’t get the full data:

blogcenter

From the above, we see that the event call back has been emitted before the doOnNext consumers have ended their execution. In this case, the seaShellDto object has not been completely filled. Therefore we will be getting nulls.

3.6.3. Example 3

Our almost reactive example encapsulates all our blocking implementations from Example 2 and creates a publisher with it. It becomes a non-blocking thread but still does not benefit from any parallelism. This is greatly the reason why such an easy approach is also so easily found to not be very efficient.

3.6.4. Example 4

The waiting approach of example 4 basically allows the publisher to delay its execution for however amount of time is needed. We have set up 100ms in our example. We can also configure that.
This is what happens:

blogcenter

Notice that with the 100ms delay, we guarantee that all doOnNext consumers are executed. However, we are also guaranteeing that we have a waiting time of 100ms for whatever request we make from the front end. This has a detrimental effect on performance.

3.6.5. Example 5

We could have gotten the idea that somehow, wrapping up a method in a publisher such as Mono or Flux, we would always create a non-blocking call. This is why I found the ForkJoin example a very good one. The act of joining a pool or submitting a Task is actually a blocking action on its own. This means that the simple fact that the ForkJoinPool is being used, renders the whole publisher a blocking publisher and blockhound will detect this publisher as being a blocking publisher.

blogcenter

In the above diagram, everything is as simplified as possible to get an overview as to what is happening behind the scenes. In this case, WebFlux cannot work independently of the main Servlet Thread per request.

3.6.6. Example 6

In example 6, we looked into a one-by-one type of implementation. Although fully reactive, it requires an extensive amount of interaction between the client and the server. This is because filling out a form with a Shell, would require one request per object type. This means, just to give an example, that we would retrieve one request for the shell, plus the amount of costumes times two, plus the number of persons times 2 because of the accounts, plus all the costumes per person requests and so on. For one shell alone, the number of requests could be quite a lot. This could, however, be something useful in the front-end, for example, for a page that loads per piece of data, or for a page where data needs to be cached and doesn’t need to be shown imediatelly. However, in our case, we do want to get everything at once, which invalidates this solution

3.6.7. Example 7

These examples match everything we discussed in our introduction. These two examples are non-blocking, completely WebFlux supported and they provide you with data as needed. They do have different paradigms but they have already been explained in point 3.5. In example 7, though paradigms changed and it’s important that we realize that in this implementation we have voided doing anything sequential. For example, we do not process pairs in the same thread or sub-threads. All the necessary threads in example 7, only fill up one object and they take the whole seaShellDto as a common shared resource. The following diagram is an overview of what happens during the processing of all publishers in example 7:

blogcenter

With the above example, what I am trying to explain with all the arrows is that one chained block is dependent on the other and that in reality, they do wait for each other individually. For example, we cannot start requesting a Top if we don’t have a Costume. We can however ask for the Top individually without considering the Lower. These two are completely separate publishers, which constantly walk through the seaShellDto objects in order to fill it up. There are no arguments being passed around through the different publishers created with the zip function. This is why, for this example, I’ve place seaShellDto in a location where it can be globally accessible through all the different publishers.

3.6.8. Example 8

In this final example, we look at the most perfect way to wrap our legacy service. In this case, we are using immutability. We will still take advantage of the usage of parallelizing the zip function. The difference is that we won’t be using any mutable variables. I wanted to explicitly show examples with the mutable parameters in order to illustrate different characteristics of programming in a reactive way with WebFlux. Java versions from Java 14 onwards, offer lots of possibilities and one of them is the usage of records. Records are just like classes, but they are immutable and they come with very convenient implementations of the hashcode and equals methods. Mutability has the enormous problem of potentially creating side effects and unpredictable code. Using records, we will be constantly creating new objects and returning them through the streams. The example of this implementation is located in a separate module at the root of the project. The name of the module is sea-shell-service-immutable. Since this solution is, at least in my view, the most perfect for our case, I have placed it in a completely separate module.

4. Listings

4.1. Rest Services

http://localhost:8080/seashells
http://localhost:8080/seashells/{id}
http://localhost:8080/seashells/slogans
http://localhost:8080/seashells/block
http://localhost:8080/seashells/block/{id}
http://localhost:8080/seashells/reactiveblock
http://localhost:8080/seashells/reactiveWithDelay
http://localhost:8080/seashells/reactiveWithForkJoins
http://localhost:8080/seashells/one/
http://localhost:8080/seashells/one/{id}
http://localhost:8080/seashells/one/person/{id}
http://localhost:8080/seashells/one/costume/{id}
http://localhost:8080/seashells/one/account/{id}
http://localhost:8080/seashells/one/top/{id}
http://localhost:8080/seashells/one/lower/{id}
http://localhost:8080/seashells/reactive
http://localhost:8080/seashells/reactive/{id}

4.2. Test Rest Services

http://localhost:8080/seashells/reactive/rootCostume/{idTop}/{idLower}
http://localhost:8080/seashells/reactive/rootShell/{idPerson}/{idCostume}
http://localhost:8080/seashells/reactive/rootCostumeSlowTop/{idTop}/{idLower}
http://localhost:8080/seashells/reactive/rootCostumeSlowLower/{idTop}/{idLower}


5. Conclusion

With this article, my main goal was to walk us through the processes I’ve been through to discover Reactive Programming and most of its intricacies when using this programming paradigm to wrap legacy services that for one or another reason, we are not allowed to change. In the end, our example 8 seems to be the better approach for whatever we want to develop. This way of programming is heavily based on Publisher, Subscriber principles. This can make code slightly more difficult to understand but on the other hand, it provides the advantage of allowing the server to support more requests at the same time. Imperative programming means that you can follow the code along a debug line. With the principles of reactive programming, we are basing all of our code on declarative programming. We are thus, creating code that although it may improve efficiency, does also require more effort to understand, change and update. WebFlux provides a framework that mitigates the complexity issue very well. As we have seen making publishers isn’t difficult at all. Neither is understanding them. Furthermore, we can easily debug them even though not doing so in a traditional way. Reactive programming is probably more often than not, the best decision to make. Let’s enumerate some of the reasons to move to a Reactive Programming paradigm and in this case with WebFlux:

  1. High availability — Requests should not block each other,
  2. High volume — We need to be able manage high volumes of data The simple answer to why do we need Reactive Programming is sometimes simplified as “It’s just faster!”.

As we were able to look in our previous examples, being faster is probably just a consequence of the server supporting more requests. An individual request will not present any difference. In fact, if we compare a simple request in a Web MVC application and the same one in a Reactive MVC application, we’ll probably see that the Web MVC request is actually faster on its own.
If we are thinking about changing our REST service model, my best advice is to just try it out. Let’s make benchmarking tests. Check our needs. Reactive Programming and WebFlux are amazing state-of-the-art technologies that should be used if we are managing high amounts of data. If we are managing legacy systems it can also be a very good potential first step to make everyone start thinking about a possible migration of the whole system to a better one.
Remember that Reactive Programming is only an aspect of Reactive Architecture. The Reactive Manifesto references resilience and availability. This is why Reactive Programming can be a part of a Reactive Architecture. It is not the only requirement. If we program reactively, it just means that we have contributed only to a very small part of a truly reactive architecture. There is still a long way to go before our system is truly Reactive.
Let’s make a quick recap of the terms we saw. Flux, ParallelFlux, and Mono are often used as publishers. The doOnNext method declares consumers to be executed after the publisher is done. The subscriber does not wait for them. Calling the subscriber allows us to implement functions that happen after the publisher is done with the eventHandler. The zip method, allows us to create one publisher out of others. The thenMany allow us to call another publisher after the current one, returning another publisher as a result. ForkJoinTasks do not work with WebFlux, because they use the main thread when joining the ForkJoinPool.
I trust that sharing my experience and my project, helps anyone interested in WebFlux to understand how this whole process works. I also hope that the examples I’ve placed on GitHub can help you see the difference between different implementations. Take note that my idea of creating a publisher chain of publishers using the zip function is just one of the many options WebFlux provides for chaining Publishers.
I hope you enjoyed this article as much as I did making it! Please leave a review, comments, or any feedback you want to give. I’m very grateful if you want to help me make this article better.
I have placed all the source code of this application on GitHub.
Thank you for reading!

6. References

Top comments (0)