DEV Community

Cover image for Microservices & RabbitMQ On Docker
Usama Ashraf
Usama Ashraf

Posted on • Updated on

Microservices & RabbitMQ On Docker

A microservices-based architecture involves decomposing your monolith app into multiple, totally independently deployable and scalable services. Beyond this base definition, what constitutes a microservice can be somewhat subjective, though there are several battle-tested practices adopted by giants like Netflix and Uber that should always be considered. And I'll discuss some of them. Ultimately, we want to divide our app into smaller apps, each of which is a system apart & deals with only one aspect of the whole app and does it really well. This decomposition is a very consequential step and can be done on the basis of subdomains, which have to be identified correctly. The smaller apps are more modular & manageable with well-defined boundaries, can be written using different languages/frameworks, fail in isolation so that the entire app doesn't go down (no SPOF). Take a Cinema ticketing example:


Source: https://codeburst.io/build-a-nodejs-cinema-api-gateway-and-deploying-it-to-docker-part-4-703c2b0dd269

Let's break down this bird's eye view:

i) The user app can be a mobile client, SPA etc or any client consuming our backend services.

ii) It's considered a bad practice to ask our clients to communicate with each of our services separately, for reasons I tried to explain here. This is what the API gateways are for: to receive client requests, call our service(s), return a response. Thus the client only has to talk to one server, giving the illusion of a monolith. Multiple gateways may be used for different kinds of clients (mobile apps, tablets, browsers etc.). They can and should be responsible for limited functionality like merging/joining responses from services, authentication, ACLs. In large applications, which need to scale and move dynamically, gateways also need access to a Service Registry which holds the locations of our microservice instances, databases etc.

iii) Each service has its own storage. This is a key point and ensures loose coupling. Some queries will then need to join data that is owned by multiple services. To avoid this major performance hit, data may be replicated and sharded. This principle is not just tolerated in microservices, but encouraged.

iv) The REST calls made to our API gateway are passed to the services, which in turn talk to other services, return a result to the gateway which, perhaps, compiles it and responds with it to the client. Communication among services like this on one client request to the app should not happen. Otherwise we'll be sacrificing performance, on account of another HTTP round-trip, for the newly introduced modularity.

A single request should, ideally, only invoke one service to fetch the response. This means any synchronous requests between the services should be minimized, and that's not always possible; mechanisms like gRPC, Thrift or even simple HTTP (as in our example) are commonly employed when necessary. As you may have guessed, this implies that data will have to be replicated across our services. Say, the GET /catalog/<<cityId>> endpoint is also supposed to return the premieres at each cinema in the city at that time. With our new strategy, the premieres will have to be stored in the database for the Cinema Catalog service as well. Hence, point iii).

Communicating Asynchronously Between The Services

So, say, the premieres change as a result of some CRUD operation on the Movies service. To keep the data in sync, that update event will have to be emitted and applied to the Cinema Catalog service as well. Try to picture our microservices as a cluster of state machines where updates in states may have to be communicated across the cluster to achieve eventual consistency. Of course, we should never expect our end-users to have to wait longer for requests to finish and sacrifice their time for modularity to our benefit. Thereby, all of this communication has to be non-blocking. And that's where RabbitMQ comes in.

RabbitMQ is a very powerful message broker that implements the AMQP messaging protocol. Here's the abstract: first, you install a RabbitMQ server instance (broker) on a system. Then a publisher/producer program connects to this server and sends out a message. RabbitMQ queues that message and siphons it off to a single or multiple subscriber/consumer programs that are out there listening on the RabbitMQ server.

Before I get to the crux of this article, I want to explicitly declare that microservices are way more complex and we won't be covering critical topics like fault tolerance because of the intricacy of distributed systems, the full role of the API gateway, Service Discovery, data consistency patterns like Sagas, preventing service failure cascading using Circuit Breakers, health checks and architectural patterns like CQRS. Not to mention how to decide whether microservices will work for you or not.

How RabbitMQ Works

More specifically, the messages are published to an exchange inside the RabbitMQ broker. The exchange then distributes copies of that message to queues on the basis of certain developer-defined rules called bindings. This part of the messages' journey is called routing. And this indirection is of course what makes for the non-blocking message transmissions. Consumers listening on those queues that got the message will receive it. Pretty simple, right?

Not exactly. There are four different types of exchanges and each, along with the bindings, defines a routing algorithm. "Routing algorithm" means, essentially, how the messages are distributed among the queues. Going into the details about each type might be an overkill here, so I'll just expand on the one we'll be using: the topic exchange:

For an exchange to push a message onto a queue, that queue must be bound to the exchange. We can create multiple exchanges with unique names, explicitly. However, when you deploy RabbitMQ it comes with a default, nameless exchange. And every queue we create will be automatically bound to this exchange. To be descriptive, I'll be creating a named exchange manually and then bind a queue to it. This binding is defined by a binding key. The exact way a binding key works, again, depends on the type of the exchange. Here's how it works with a topic exchange:

  • A queue is bound to an exchange using a string pattern (the binding key)
  • The published message is delivered to the exchange along with a routing key
  • The exchange checks which queues match the routing key based on the binding key pattern defined before.

* can substitute for exactly one word. # can substitute for zero or more words.

Source: https://www.rabbitmq.com/tutorials/tutorial-five-python.html

Any message with a routing key "quick.orange.rabbit" will be delivered to both queues. However, messages with "lazy.brown.fox" will only reach Q2. Those with a routing key not matching any pattern will be lost.

For some perspective, let's just skim over two other exchange types:

  • Fanout exchange: Messages sent to this kind of exchange will be sent to ALL the queues bound to it. The routing key, if provided, will be completely ignored. This can be used, for example, for broadcasting global configuration updates across a distributed system.
  • Direct exchange (simplest): Sends the message to the queue whose binding key is exactly equal to the given routing key. If there are multiple consumers listening on the queue, then the messages will be load-balanced among them, hence, it is commonly used to distribute tasks between multiple workers in a round robin manner.

My illustration will be very simple: a Python Flask app with a single POST endpoint, which, when called, will purport to update a user's info, emit a message to the RabbitMQ broker (non-blocking of course) and return a 201. A separate Go service will be listening for the message from the broker and hence have the chance to update its data accordingly. All three will be hosted on separate containers.

Setting Up Our Containerized Microservices & Broker Using Docker Compose

Provisioning a bunch of containers and all that goes along with them can be a pain, so I always rely on Docker Compose.

Here's the entire code. We're declaring three services that will be used for the three containers. The two volumes are needed for putting our code inside the containers:



# docker-compose.yml

version: "3.2"
services:
    rabbitmq-server:
        build: ./rabbitmq-server

    python-service:
        build: ./python-service
        # 'rabbitmq-server' will be available as a network reference inside this service 
        # and this service will start only after the RabbitMQ service does.
        depends_on:
            - rabbitmq-server
        # Keep it running.  
        tty: true
        # Map port 3000 on the host machine to port 3000 of the container.
        # This will be used to receive HTTP requests made to the service.
        ports:
            - "3000:3000"
        volumes:
            - './python-service:/python-service'

    go-service:
        build: ./go-service
        depends_on:
            - rabbitmq-server
        tty: true
        volumes:
            - './go-service:/go-service'

# Host volumes used to store code.
volumes:
    python-service:
    go-service:


Enter fullscreen mode Exit fullscreen mode

The Dockerfiles are pretty much the standard ones from Docker Hub, to which I've added:

  • /go-service working directory in the Go service container.
  • /python-service working directory in the Python service container.
  • Go's RabbitMQ client library called amqp
  • Python's RabbitMQ client Pika & Flask

Our Flask app has just one endpoint that receives a user_id and a full_name, which will be used to update the user's profile. A message indicating this update will then be sent to the RabbitMQ broker.



# main.py

from flask import Flask
from flask import request
from flask import jsonify
from services.user_event_handler import emit_user_profile_update

app = Flask(__name__)

@app.route('/users/<int:user_id>', methods=['POST'])
def update(user_id):
    new_name = request.form['full_name']

    # Update the user in the datastore using a local transaction...

    emit_user_profile_update(user_id, {'full_name': new_name})

    return jsonify({'full_name': new_name}), 201


Enter fullscreen mode Exit fullscreen mode

The logic for emitting events to other services should always be separate from the rest of the app, so I've extracted it to a module. The exchange should be explicitly checked for and created by the publisher and consumer both, because we can't know (nor should we rely on) which service starts first. This is a good practice, which most RabbitMQ client libraries facilitate seamlessly:



# services/user_event_handler.py

import pika
import json

def emit_user_profile_update(user_id, new_data):
    # 'rabbitmq-server' is the network reference we have to the broker, 
    # thanks to Docker Compose.
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq-server'))
    channel    = connection.channel()

    exchange_name = 'user_updates'
    routing_key   = 'user.profile.update'

    # This will create the exchange if it doesn't already exist.
    channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True)

    new_data['id'] = user_id

    channel.basic_publish(exchange=exchange_name,
                          routing_key=routing_key,
                          body=json.dumps(new_data),
                          # Delivery mode 2 makes the broker save the message to disk.
                          # This will ensure that the message be restored on reboot even  
                          # if RabbitMQ crashes before having forwarded the message.
                          properties=pika.BasicProperties(
                            delivery_mode = 2,
                        ))

    print("%r sent to exchange %r with data: %r" % (routing_key, exchange_name, new_data))
    connection.close()


Enter fullscreen mode Exit fullscreen mode

Don't get confused by channel. A channel is just a virtual, lightweight connection within the TCP connection that is meant to prevent opening multiple, expensive TCP connections. Especially in multithreaded environments.

The durable parameter ensures that the exchange is persisted to the disk and can be restored if the broker crashes or goes offline for any reason. The publisher (Python service) creates an exchange named user_updates and sends it the user's updated data with user.profile.update as the routing key. This will be matched with a user.profile.* binding key, which our Go service will define:



// main.go

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        // Exit the program.
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    // 'rabbitmq-server' is the network reference we have to the broker, 
    // thanks to Docker Compose.
    conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-server:5672/")
    failOnError(err, "Error connecting to the broker")
    // Make sure we close the connection whenever the program is about to exit.
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    // Make sure we close the channel whenever the program is about to exit.
    defer ch.Close()

    exchangeName := "user_updates"
    bindingKey   := "user.profile.*"

    // Create the exchange if it doesn't already exist.
    err = ch.ExchangeDeclare(
            exchangeName,   // name
            "topic",        // type
            true,           // durable
            false,
            false,
            false,
            nil,
    )
    failOnError(err, "Error creating the exchange")

    // Create the queue if it doesn't already exist.
    // This does not need to be done in the publisher because the
    // queue is only relevant to the consumer, which subscribes to it.
    // Like the exchange, let's make it durable (saved to disk) too.
    q, err := ch.QueueDeclare(
            "",    // name - empty means a random, unique name will be assigned
            true,  // durable
            false, // delete when the last consumer unsubscribes
            false, 
            false, 
            nil,   
    )
    failOnError(err, "Error creating the queue")

    // Bind the queue to the exchange based on a string pattern (binding key).
    err = ch.QueueBind(
            q.Name,       // queue name
            bindingKey,   // binding key
            exchangeName, // exchange
            false,
            nil,
    )
    failOnError(err, "Error binding the queue")

    // Subscribe to the queue.
    msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer id - empty means a random, unique id will be assigned
            false,  // auto acknowledgement of message delivery
            false,  
            false,  
            false,  
            nil,
    )
    failOnError(err, "Failed to register as a consumer")


    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received message: %s", d.Body)

            // Update the user's data on the service's 
            // associated datastore using a local transaction...

            // The 'false' indicates the success of a single delivery, 'true' would
            // mean that this delivery and all prior unacknowledged deliveries on this
            // channel will be acknowledged, which I find no reason for in this example.
            d.Ack(false)
        }
    }()

    fmt.Println("Service listening for events...")

    // Block until 'forever' receives a value, which will never happen.
    <-forever
}


Enter fullscreen mode Exit fullscreen mode

RabbitMQ uses port 5672 by default for non-TLS connections and "guest" as the username & password. You can study the plethora of configuration options available and how to use them with Pika and Go amqp.

You might be wondering what this line is for: d.Ack(false)

This tells the broker that the message has been delivered, processed successfully and can be deleted. By default, these acknowledgments happen automatically. But we specified so otherwise when we subscribed to the queue: ch.Consume.

Now, if the Go service crashes (for any unforeseeable reason), the acknowledgment won't be sent and this will cause the broker to re-queue the message so that it may be given another chance to be processed.

Launching The Microservices

Alright, let's fire'em up:

Run docker-compose up

When the three services have been built (will take at least a few minutes the first time), check for their names using docker ps:

Open two new terminals, SSH into the Python and Go containers using the respective container names and start the servers:

docker exec -it microservicesusingrabbitmq_python-service_1 bash
FLASK_APP=main.py python -m flask run --port 3000 --host 0.0.0.0

docker exec -it microservicesusingrabbitmq_go-service_1 bash
go run main.go

Open a third terminal to send the POST request. I'll use Curl:

curl -d "full_name=usama" -X POST http://localhost:3000/users/1

And you'll see the transmission:

At any point, you may also SSH into the RabbitMQ container and just look around:

  • rabbitmqctl list_exchanges (list all the exchanges on this broker node)
  • rabbitmqctl list_queues (list all the queues on this broker node)
  • rabbitmqctl list_bindings (list all the bindings on this broker node)
  • rabbitmqctl list_queues name messages_ready messages_unacknowledged (list all the queues with the number of messages each has that are ready to be delivered to clients but not yet delivered and those delivered but not yet acknowledged)

As I mentioned at the start, this is by no means a deep dive into microservices. There are many questions to be asked and I'll try to answer an important one: how do we make this communication transactional? So what happens if our Go service (consumer) throws an exception while updating the state on its end and we need to make sure that the update event rolls back across all the services that were affected by it?. Imagine how complicated this could get when we have several microservices and thousands of such "update events". Essentially, we'll need to incorporate separate events that perform the rollback.

In our case, if the Go service throws an exception while updating the data, it will have to send a message back to the Python service telling it to rollback the update. It's also important to note that in the case of these kinds of errors, the message delivery will have to be acknowledged (even though the processing wasn't successful), so that the message doesn't get re-queued by the broker. When writing our consumer, we'll have to decide which errors mean that the message should be re-queued (tried again) and which mean that the message should not be re-queued and just rolled back.

But how do we specify which update event to rollback and how exactly would the rollback happen? The Saga pattern along with event sourcing is widely used to ensure such data consistency.

A Few Words On Designing The Broker

Consider two things: the types of exchanges to use and how to group the exchanges.

If you need to broadcast certain kinds of messages to all the services in your system, have a look at the fanout exchange type. Then, one way to group the exchanges could be on the basis of events e.g. three fanout exchanges named user.profile.updated, user.profile.deleted, user.profile.added. This might not be what you want all the time since you might end up with too many exchanges and won’t be able to filter messages for specific consumers without creating a new exchange, queue and binding.

Another way could be to create topic exchanges in terms of entities in your system. So in our first example, user, movie, cinema etc could be entities and, say, queues bound to user could use binding keys like user.created (get message when a user is created), user.login (get message when a user has just logged in), user.roles.grant (get message telling that the user has been given an authorization role), user.notify (send the user a notification) etc.

Always use routing to filter and deliver messages to specific consumers. Writing code to discard certain incoming messages while accepting others is an anti-pattern. Consumers should only receive messages that they require.

Finally, if your needs are complex and you require that messages be filtered down to certain consumers on the basis of multiple properties, use a headers exchange.

Enjoy!

Top comments (8)

Collapse
 
sulsj profile image
Seung-Jin Sul • Edited

Hi Usama, Thanks for useful article! I was wondering if you can help to test it. I've cloned it and hit "docker-compose up" and found the below error. Any idea?

  • apt-cache show+ grep -q erlang-base-hipe Package: erlang-base-hipe

  • apt-get install -y --no-install-recommends erlang-base-hipe
    Reading package lists...
    Building dependency tree...
    Reading state information...
    Some packages could not be installed. This may mean that you have
    requested an impossible situation or if you are using the unstable
    distribution that some required packages have not yet been created
    or been moved out of Incoming.
    The following information may help to resolve the situation:

The following packages have unmet dependencies:
erlang-base-hipe : Depends: libtinfo6 (>= 6) but it is not installable
E: Unable to correct problems, you have held broken packages.
ERROR: Service 'rabbitmq-server' failed to build: The command '/bin/sh -c set -eux; apt-get update; if apt-cache show erlang-base-hipe 2>/dev/null | grep -q 'Package: erlang-base-hipe'; then apt-get install -y --no-install-recommends erlang-base-hipe ; fi; apt-get install -y --no-install-recommends erlang-asn1 erlang-crypto erlang-eldap erlang-inets erlang-mnesia erlang-nox erlang-os-mon erlang-public-key erlang-ssl erlang-xmerl ; rm -rf /var/lib/apt/lists/*' returned a non-zero code: 100

Collapse
 
usamaashraf profile image
Usama Ashraf

Hey Seung-Jin, thanks so much. Can I please know what kind of system and environment you're working with?

Collapse
 
peterhasitschka profile image
Peter Hasitschka

Hi Usama!

At first I also want to thank you for that great introduction in that complex but fascinating topic.
While trying out your code I ran in the completely same issue like Seung-Jin described.
I'm running Docker 18.03 on Ubuntu 18.04.

However I managed to get rid of that error by replacing your rabbitmq-image with the native one.
For others who are faced with that error, just replace the rabbitmq-server part in the docker-compose.yml with the following snippet.

rabbitmq-server:
        image: rabbitmq
        hostname: "rabbitmq-server"
        environment:
            RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
            RABBITMQ_DEFAULT_USER: "guest"
            RABBITMQ_DEFAULT_PASS: "guest"
            RABBITMQ_DEFAULT_VHOST: "/"
        ports:
            - "15672:15672"
            - "5672:5672"
        labels:
            NAME: "rabbitmq-server"

This allowed me to successfuly test your environment :-)

Best,
Peter

Thread Thread
 
usamaashraf profile image
Usama Ashraf

Thank you Peter!

Collapse
 
tranphuoctien profile image
Tran Tien

Thank you for diagram!

Collapse
 
kish2011_ profile image
Kishore Sahoo

@usamaashraf Do we need three droplets in AWS or DO for Python, Golang & RabbitMQ services or we can keep three services in one droplet? Thanks

Collapse
 
priyankt68 profile image
Priyank Trivedi

You can keep three services in one droplet itself.

Collapse
 
enriqueedelberto profile image
Edelberto Enrique Reyes

Thanks for sharing. It's really interesting. You explain microservices in a really good way.
Congratulations.