DEV Community

Jorge
Jorge

Posted on • Originally published at jorge.aguilera.soy on

Jugando con Mono y Flux (de Reactor)

Cada día la programación funcional está más en boga y hay que ponerse las pilas, así que me he inventado este caso de uso para jugar un poco con ella y sobre todo por practicar con Mono y Fluxde Reactor (esta vez en Java)

WARNING

As usual, este post se basa en lo que yo he buscado y probado, pudiendo estar mal e incluso rematadamente mal, pero …​ en mi local funciona.

Caso de Uso

Supongamos que tenemos dos sistemas independientes (dos microservicios en tu arquitectura o dos servicios de dos proveedores diferentes, o uno tuyo y otro externo, por ejemplo):

  • Stock Service. Gestiona la cantidad de artículos que tenemos en el almacén

  • Ecommerce Service. Gestiona algunos de artículos junto con su cantidad disponible más el precio de venta (por ejemplo querríamos tener un catálogo de artículos y para cada uno querer tener en almacén 100 unidades pero en eCommerce solamente 10)

Ambos sistemas son independientes, con sus bases de datos correspondientes, etc y son accesibles a través de unos endpoints diferentes.

La idea es ejecutar un proceso que busque el stock actual de todos los artículos, llamando a Stock Service, aplicar una regla de negocio para cada uno, e invocar al endpoint de eCommerce para actualizar la cantidad de cada uno.

Además, como eCommerce dispone de un subconjunto de artículos, deberemos consultar en eCommerce la lista de artículos para no recorrer todo el catálogo que tenemos en Stock

Diagram

No parece muy difícil …​

Stock Service

    public record Stock(String sku, int cantidad) {
    }

    public record StockPage(int total, Collection<Stock> stocks) {
    }

    @Get("/")
    Mono<StockPage> get(@QueryValue int page)

    @Post("/update")
    void update(@Body List<Stock> stocks)
Enter fullscreen mode Exit fullscreen mode

Simplificando mucho vamos a trabajar con un Stock identificado por su sku y del que mantenemos la cantidad que tenemos en stock

StockPage nos sirve para hacer una paginación básica devolviendo el total de artículos en la base de datos, y unos cuantos stocks. El servicio devolverá páginas según se le pida por parámetro.

También añadimos un endpoint para poder actualizar el stock (que nos servirá para jugar a cambiar cantidades del stock y volver a ejecutar nuestro proceso)

WARNING

obviamente es un ejemplo muy simplificado

eCommerce Service

    public record Articulo(String sku, int cantidad, double precio) {
    }

    public record ArticuloPage(int total, Collection<Articulo> articulos) {
    }

    @Get("/articulos")
    public Mono<ArticuloPage> get(@QueryValue int page)

    @Post("/update")
    public Mono<List<UpdateStock>> update(@Body List<UpdateStock> update)
Enter fullscreen mode Exit fullscreen mode

Parecido al Stock Service, el eCommerce service nos permite paginar los artículos y (lo más importante en este caso) actualizar la cantidad de stock de articulos en ecommmerce

Aproximación tradicional

Siguiendo el diagrama anterior una posible implementación sería:

int offset = 0;
while( offset < maxArticles ){

    // obtener n articulos en ecomerce
    var articuloPage = ecommerceClient.get(offset).block();

    // extraer los ids
    var articulos = new ArrayList<String>();
    for(var articulo : articuloPage.articulos()){
        articulos.add(articulo.sku());
    }

    // ver el stock de estos articulos
    StockPage stockPage = stockClient.get( String.join(",",articulos) ).block();

    // logica de negocio para estos articulos
    List<UpdateStock> updateStocks = new ArrayList<>();
    for (Stock s : stockPage.stocks()) {
        updateStocks.add(new UpdateStock(s.sku(), s.cantidad()));
    }

    // actualizar ecommerce
    ecommerceClient.update(updateStocks).block();
    offset+=batchSize;
}
Enter fullscreen mode Exit fullscreen mode

Bueno, un poco engorroso pero (semi)fácil de seguir. Pedidos unos pocos articulos a ecommerce, pedimos la cantidad de estos al servicio de stock, y actualizamos ecommerce con la cantidad que tenemos en stock

El "detalle" de esta implementación es que estamos haciendo llamadas a servicios externos y estamos esperando a que se completen (por eso el uso de block() al final de las llamadas a endpoints)

Si este bucle se está ejecutando en el hilo de un Controller, por ejemplo, este hilo se queda bloqueado mientras se resuelven las llamadas pudiendo quedarse congelado si no disponemos de los hilos necesarios.

Implementación Reactiva (no bloqueante)

La implementación reactiva (y funcional) va a hacer uso de los objetos Mono y Flux (si usas reactor) y otros, los cuales en realidad NO ejecutan en ese momento la lógica que le indicamos, sino que las van "encadenando" y ejecutando en hilos diferentes coordinados por la implementación reactiva que usemos

Mono<ArrayList<UpdateStock>> reactive() {
    return Flux
        .generate(() -> 0, (offset, emitter) -> {
            if (offset < maxArticles) {
                emitter.next(offset);
            } else {
                emitter.complete();
            }
            return offset + batchSize;
        })
        .concatMap(page -> ecommerceClient.get((Integer) page))
        .map(ArticuloPage::articulos)
        .filter(items -> !items.isEmpty())
        .flatMap(items -> stockClient.get(items.stream().map(Articulo::sku).collect(Collectors.joining()))
                .map(stocks -> stocks.stocks()
                        .stream()
                        .map(s -> new UpdateStock(s.sku(), s.cantidad()))
                        .toList()))
        .concatMap(ecommerceClient::update)
        .reduce(new ArrayList<>(), (prev, items) -> {
            prev.addAll(items);
            return prev;
        });
}
Enter fullscreen mode Exit fullscreen mode
INFO

No he conseguido resolver de forma satisfactoria el hacer una primera llamada que me de el numero de páginas que hay en eCommerce por lo que voy a hacer un número de llamadas "fijas" y luego filtrar las respuestas que no contengan datos.

Esta implementación:

  • genera un Flux de números entre 0 y N

  • cada page es mapeado a una llamada asíncrona al eCommerce

  • De lo que devuelva (en un futuro) el eCommerce, nos quedamos con la lista de artículos

  • filtramos aquellas respuestas que vienen vacía

  • Cada lista de artículos en eCommerce es convertida a una llamada al servicio de stock

  • Cada lista de stocks que nos devolverá (en un futuro) el stock la usaremos para actualizar eCommerce

  • Por último, juntaremos todas las respuestas (futuras) de eCommerce en un List para devolver la lista de articulos actualizados

Repositorio

He creado un repositorio con una aplicación Micronaut por si quieres jugar con estos servicios y las implementaciones.

https://github.com/jagedn/reactor-demo

Para simplificar la implementación los dos servicios (stock y ecommerce) se encuentran en la misma aplicación, pero de forma independiente. Cada uno se accede por unos endpoints diferentes y mantienen una "base de datos" independiente (la base de datos es un simple List en memoria)

Una vez clonado el repositorio podremos ejecutar la aplicacion:

./gradlew run

y acceder a los servicios

http localhost:8080/ecommerce/articulos, para ver la situacion actual de ecommerce

http localhost:8080/stock/, para ver la situacion actual del stock

http -v localhost:8080/stock/update [0][sku]=1 [0][cantidad]=1 para actualizar la cantidad del artículo "1" en la base de datos del stock

En este punto, eCommerce tiene una cantidad diferente que Stock así que ejecutaremos la lógica de negocio que las "sincroniza"

http localhost:8080/ejemplo/block

o

http localhost:8080/ejemplo/reactive

Si todo ha ido bien, la cantidad disponible en stock habrá sido actualizada en eCommerce

Top comments (0)